]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: compound osd operations
authorSage Weil <sage@newdream.net>
Fri, 7 Nov 2008 04:48:55 +0000 (20:48 -0800)
committerSage Weil <sage@newdream.net>
Mon, 10 Nov 2008 00:10:51 +0000 (16:10 -0800)
17 files changed:
src/include/ceph_fs.h
src/include/types.h
src/kernel/addr.c
src/kernel/osd_client.c
src/messages/MOSDOp.h
src/messages/MOSDOpReply.h
src/messages/MOSDSubOp.h
src/messages/MOSDSubOpReply.h
src/osd/OSD.cc
src/osd/PG.cc
src/osd/PG.h
src/osd/RAID4PG.cc
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h
src/osd/osd_types.h
src/osdc/Objecter.cc
src/vstart.sh

index 819d23fcf95a60a11931976bfc55f79aef7a6144..e9527f053c5b123b725f12fb249a9dad6a9f9fed 100644 (file)
@@ -1040,6 +1040,8 @@ enum {
        /* read */
        CEPH_OSD_OP_READ       = 1,
        CEPH_OSD_OP_STAT       = 2,
+       CEPH_OSD_OP_GETXATTR   = 3,
+       CEPH_OSD_OP_GETXATTRS  = 4,
 
        /* modify */
        CEPH_OSD_OP_WRNOOP     = 10, /* write no-op (i.e. sync) */
@@ -1048,6 +1050,9 @@ enum {
        CEPH_OSD_OP_TRUNCATE   = 13,
        CEPH_OSD_OP_ZERO       = 14, /* zero extent */
        CEPH_OSD_OP_WRITEFULL  = 15, /* write complete object */
+       CEPH_OSD_OP_SETXATTR   = 16,
+       CEPH_OSD_OP_SETXATTRS  = 17,
+       CEPH_OSD_OP_RMXATTR    = 18,
 
        /* lock */
        CEPH_OSD_OP_WRLOCK     = 20,
@@ -1122,26 +1127,27 @@ enum {
        CEPH_OSD_OP_BALANCE_READS = 16,
        CEPH_OSD_OP_ACKNVRAM = 32,    /* ACK when stable in NVRAM, not RAM */
        CEPH_OSD_OP_ORDERSNAP = 64,   /* EOLDSNAP if snapc is out of order */
+       CEPH_OSD_OP_PEERSTAT = 128,   /* msg includes osd_peer_stat */
 };
 
 #define EOLDSNAPC 44 /* ORDERSNAP flag set and writer has old snap context*/
 
-struct ceph_osd_peer_stat {
-       struct ceph_timespec stamp;
-       float oprate;
-       float qlen;
-       float recent_qlen;
-       float read_latency;
-       float read_latency_mine;
-       float frac_rd_ops_shed_in;
-       float frac_rd_ops_shed_out;
+struct ceph_osd_op {
+       __le16 op;
+       union {
+               struct {
+                       __le64 offset, length;
+               };
+               struct {
+                       __le32 name_len;
+                       __le32 value_len;
+               };
+       };
 } __attribute__ ((packed));
 
 struct ceph_osd_request_head {
        ceph_tid_t                tid;
        __le32                    client_inc;
-       __le32                    op;
-       __le64                    offset, length;
        struct ceph_object        oid;
        struct ceph_object_layout layout;
        ceph_epoch_t              osdmap_epoch;
@@ -1151,26 +1157,30 @@ struct ceph_osd_request_head {
 
        struct ceph_eversion      reassert_version;
 
-       /* semi-hack, fix me */
-       __le32                    shed_count;
-       struct ceph_osd_peer_stat peer_stat;
-
        /* writer's snap context */
        __le64 snap_seq;
        __le32 num_snaps;
-       __le64 snaps[];
+
+       /* read or mutation */
+       __le16 num_ops;
+       __u8 is_modify;
+       __u8 object_type;
+       struct ceph_osd_op ops[];  /* followed by snaps */
 } __attribute__ ((packed));
 
 struct ceph_osd_reply_head {
        ceph_tid_t           tid;
-       __le32               op;
        __le32               flags;
        struct ceph_object   oid;
        struct ceph_object_layout layout;
        ceph_epoch_t         osdmap_epoch;
-       __le32               result;
-       __le64               offset, length;
        struct ceph_eversion reassert_version;
+
+       __le32 result;
+
+       __le16 num_ops;
+       __le16 is_modify;
+       struct ceph_osd_op ops[0];
 } __attribute__ ((packed));
 
 #endif
index 9dcec5b1f6bafe3daec927975f4d8d7e53490799..aa99e564375f990e28cbf7b2d6d12cdeb22ba74e 100644 (file)
@@ -212,6 +212,7 @@ WRITE_RAW_ENCODER(ceph_frag_tree_split)
 WRITE_RAW_ENCODER(ceph_inopath_item)
 WRITE_RAW_ENCODER(ceph_osd_request_head)
 WRITE_RAW_ENCODER(ceph_osd_reply_head)
+WRITE_RAW_ENCODER(ceph_osd_op)
 
 WRITE_RAW_ENCODER(ceph_mon_statfs)
 WRITE_RAW_ENCODER(ceph_mon_statfs_reply)
@@ -373,8 +374,11 @@ inline ostream& operator<<(ostream& out, const ceph_fsid& f) {
   return out << hex << f.major << '.' << f.minor << dec;
 }
 
-
-
+inline ostream& operator<<(ostream& out, const ceph_osd_op& op) {
+  out << ceph_osd_op_name(op.op);
+  out << " " << op.offset << "~" << op.length;
+  return out;
+}
 
 
 #endif
index 2be882c049f5c9d2380f1d1be3be999f5a1a00de..9caa8ace0ef7275b37523dc68deb089437698d96 100644 (file)
@@ -467,6 +467,7 @@ static void writepages_finish(struct ceph_osd_request *req)
 {
        struct inode *inode = req->r_inode;
        struct ceph_osd_reply_head *replyhead;
+       struct ceph_osd_op *op;
        struct ceph_inode_info *ci = ceph_inode(inode);
        unsigned wrote;
        loff_t offset = req->r_pages[0]->index << PAGE_CACHE_SHIFT;
@@ -481,8 +482,9 @@ static void writepages_finish(struct ceph_osd_request *req)
        /* parse reply */
        if (req->r_reply) {
                replyhead = req->r_reply->front.iov_base;
+               op = (void *)(replyhead + 1);
                rc = le32_to_cpu(replyhead->result);
-               bytes = le64_to_cpu(replyhead->length);
+               bytes = le64_to_cpu(op->length);
        }
 
        if (rc >= 0) {
index 5ac54177d6e570b8e46848873d2d0503fe59b98e..3143d3cc90cb59e47bc26fa5473ab5888d9b040f 100644 (file)
@@ -35,6 +35,7 @@ static void calc_layout(struct ceph_osd_client *osdc,
                        struct ceph_osd_request *req)
 {
        struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
+       struct ceph_osd_op *op = (void *)(reqhead + 1);
        u64 orig_len = *plen;
        u64 objoff, objlen;    /* extent in object */
 
@@ -47,8 +48,8 @@ static void calc_layout(struct ceph_osd_client *osdc,
        if (*plen < orig_len)
                dout(10, " skipping last %llu, final file extent %llu~%llu\n",
                     orig_len - *plen, off, *plen);
-       reqhead->offset = cpu_to_le64(objoff);
-       reqhead->length = cpu_to_le64(objlen);
+       op->offset = cpu_to_le64(objoff);
+       op->length = cpu_to_le64(objlen);
        req->r_num_pages = calc_pages_for(off, *plen);
 
        /* pgid? */
@@ -88,12 +89,15 @@ void ceph_osdc_put_request(struct ceph_osd_request *req)
 /*
  * build osd request message only.
  */
-static struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, int op,
+static struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, int opc,
                                        struct ceph_snap_context *snapc)
 {
        struct ceph_msg *req;
        struct ceph_osd_request_head *head;
-       size_t size = sizeof(*head);
+       struct ceph_osd_op *op;
+       __le64 *snaps;
+       size_t size = sizeof(*head) + sizeof(*op);
+       int i;
 
        if (snapc)
                size += sizeof(u64) * snapc->num_snaps;
@@ -102,17 +106,20 @@ static struct ceph_msg *new_request_msg(struct ceph_osd_client *osdc, int op,
                return req;
        memset(req->front.iov_base, 0, req->front.iov_len);
        head = req->front.iov_base;
+       op = (void *)(head + 1);
+       snaps = (void *)(op + 1);
 
        /* encode head */
-       head->op = cpu_to_le32(op);
        head->client_inc = cpu_to_le32(1); /* always, for now. */
        head->flags = 0;
+       head->num_ops = cpu_to_le16(1);
+       op->op = cpu_to_le32(opc);
 
        if (snapc) {
                head->snap_seq = cpu_to_le64(snapc->seq);
                head->num_snaps = cpu_to_le32(snapc->num_snaps);
-               memcpy(req->front.iov_base + sizeof(*head), snapc->snaps,
-                      snapc->num_snaps*sizeof(u64));
+               for (i = 0; i < snapc->num_snaps; i++)
+                       snaps[i] = cpu_to_le64(snapc->snaps[i]);
        }
        return req;
 }
@@ -879,6 +886,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
 {
        struct ceph_osd_request *req;
        struct ceph_osd_request_head *reqhead;
+       struct ceph_osd_op *op;
        struct page *page;
        pgoff_t next_index;
        int contig_pages;
@@ -918,7 +926,8 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
                  len);
        req->r_num_pages = contig_pages;
        reqhead = req->r_request->front.iov_base;
-       reqhead->length = cpu_to_le64(len);
+       op = (void *)(reqhead + 1);
+       op->length = cpu_to_le64(len);
        dout(10, "readpages final extent is %llu~%llu -> %d pages\n",
             off, len, req->r_num_pages);
        rc = do_sync_request(osdc, req);
@@ -1026,6 +1035,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
 {
        struct ceph_msg *reqm;
        struct ceph_osd_request_head *reqhead;
+       struct ceph_osd_op *op;
        struct ceph_osd_request *req;
        int rc = 0;
 
@@ -1041,8 +1051,9 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
                reqhead->flags = cpu_to_le32(CEPH_OSD_OP_ACK);
        else
                reqhead->flags = cpu_to_le32(CEPH_OSD_OP_SAFE);
+       op = (void *)(reqhead + 1);
 
-       len = le64_to_cpu(reqhead->length);
+       len = le64_to_cpu(op->length);
        dout(10, "writepages %llu~%llu -> %d pages\n", off, len,
             req->r_num_pages);
 
@@ -1070,7 +1081,8 @@ int ceph_osdc_writepages_start(struct ceph_osd_client *osdc,
 {
        struct ceph_msg *reqm = req->r_request;
        struct ceph_osd_request_head *reqhead = reqm->front.iov_base;
-       u64 off = le64_to_cpu(reqhead->offset);
+       struct ceph_osd_op *op = (void *)(reqhead + 1);
+       u64 off = le64_to_cpu(op->offset);
        int rc;
 
        dout(10, "writepages_start %llu~%llu, %d pages\n", off, len, num_pages);
@@ -1079,7 +1091,7 @@ int ceph_osdc_writepages_start(struct ceph_osd_client *osdc,
                reqhead->flags = cpu_to_le32(CEPH_OSD_OP_ACK);
        else
                reqhead->flags = cpu_to_le32(CEPH_OSD_OP_SAFE);
-       reqhead->length = cpu_to_le64(len);
+       op->length = cpu_to_le64(len);
 
        /* reference pages in message */
        reqm->pages = req->r_pages;
index 5d386b5a7272a85a92ab544ad82ebd1aae02e274..06e965fec43edceb442cecb3a65e6f698deba436 100644 (file)
 class MOSDOp : public Message {
 private:
   ceph_osd_request_head head;
+public:
+  vector<ceph_osd_op> ops;
   vector<snapid_t> snaps;
+  osd_peer_stat_t peer_stat;
 
   friend class MOSDOpReply;
 
-public:
   snapid_t get_snap_seq() { return snapid_t(head.snap_seq); }
   vector<snapid_t> &get_snaps() { return snaps; }
   void set_snap_seq(snapid_t s) { head.snap_seq = s; }
@@ -54,48 +56,79 @@ public:
 
   eversion_t get_version() { return head.reassert_version; }
   
-  const int    get_op() { return head.op; }
-  void set_op(int o) { head.op = o; }
-  bool is_read() { return ceph_osd_op_is_read(get_op()); }
-
-  loff_t get_length() const { return head.length; }
-  loff_t get_offset() const { return head.offset; }
+  bool is_modify() { return head.is_modify; }
 
   unsigned get_inc_lock() const { return head.inc_lock; }
 
-  void set_peer_stat(const osd_peer_stat_t& stat) { head.peer_stat = stat; }
-  const ceph_osd_peer_stat& get_peer_stat() { return head.peer_stat; }
+  void set_peer_stat(const osd_peer_stat_t& stat) {
+    peer_stat = stat;
+    head.flags = (head.flags | CEPH_OSD_OP_PEERSTAT);
+  }
+  const osd_peer_stat_t& get_peer_stat() {
+    assert(head.flags & CEPH_OSD_OP_PEERSTAT);
+    return peer_stat; 
+  }
 
-  void inc_shed_count() { head.shed_count = get_shed_count() + 1; }
-  int get_shed_count() { return head.shed_count; }
-  
+  //void inc_shed_count() { head.shed_count = get_shed_count() + 1; }
+  //int get_shed_count() { return head.shed_count; }
 
 
-  MOSDOp(int inc, long tid,
-         object_t oid, ceph_object_layout ol, epoch_t mapepoch, int op,
+  MOSDOp(int inc, long tid, bool modify,
+         object_t oid, ceph_object_layout ol, epoch_t mapepoch,
         int flags) :
     Message(CEPH_MSG_OSD_OP) {
     memset(&head, 0, sizeof(head));
     head.tid = tid;
     head.client_inc = inc;
+    head.is_modify = modify;
     head.oid = oid;
     head.layout = ol;
     head.osdmap_epoch = mapepoch;
-    head.op = op;
     head.flags = flags;
   }
   MOSDOp() {}
 
-  void set_inc_lock(__u32 l) {
-    head.inc_lock = l;
+  void set_inc_lock(__u32 l) { head.inc_lock = l; }
+  void set_layout(const ceph_object_layout& l) { head.layout = l; }
+  void set_version(eversion_t v) { head.reassert_version = v; }
+
+  // ops
+  void add_simple_op(int o, __u64 off, __u64 len) {
+    ceph_osd_op op;
+    op.op = o;
+    op.offset = off;
+    op.length = len;
+    ops.push_back(op);
+  }
+  void write(__u64 off, __u64 len, bufferlist& bl) {
+    add_simple_op(CEPH_OSD_OP_WRITE, off, len);
+    data.claim(bl);
+    header.data_off = off;
+  }
+  void writefull(bufferlist& bl) {
+    add_simple_op(CEPH_OSD_OP_WRITEFULL, 0, bl.length());
+    data.claim(bl);
+    header.data_off = 0;
+  }
+  void zero(__u64 off, __u64 len) {
+    add_simple_op(CEPH_OSD_OP_ZERO, off, len);
+  }
+  void truncate(__u64 off) {
+    add_simple_op(CEPH_OSD_OP_TRUNCATE, off, 0);
+  }
+  void remove() {
+    add_simple_op(CEPH_OSD_OP_DELETE, 0, 0);
   }
 
-  void set_layout(const ceph_object_layout& l) { head.layout = l; }
+  void read(__u64 off, __u64 len) {
+    add_simple_op(CEPH_OSD_OP_READ, off, len);
+  }
+  void stat() {
+    add_simple_op(CEPH_OSD_OP_STAT, 0, 0);
+  }
 
-  void set_length(loff_t l) { head.length = l; }
-  void set_offset(loff_t o) { head.offset = o; }
-  void set_version(eversion_t v) { head.reassert_version = v; }
-  
+  // flags
   int get_flags() const { return head.flags; }
   bool wants_ack() const { return get_flags() & CEPH_OSD_OP_ACK; }
   bool wants_commit() const { return get_flags() & CEPH_OSD_OP_SAFE; }
@@ -113,27 +146,28 @@ public:
   // marshalling
   virtual void encode_payload() {
     head.num_snaps = snaps.size();
+    head.num_ops = ops.size();
     ::encode(head, payload);
-    for (unsigned i=0; i<snaps.size(); i++)
-      ::encode(snaps[i], payload);
-    header.data_off = get_offset();
+    ::encode_nohead(ops, payload);
+    ::encode_nohead(snaps, payload);
+    if (head.flags & CEPH_OSD_OP_PEERSTAT)
+      ::encode(peer_stat, payload);
   }
 
   virtual void decode_payload() {
     bufferlist::iterator p = payload.begin();
     ::decode(head, p);
-    snaps.resize(head.num_snaps);
-    for (unsigned i=0; i<snaps.size(); i++)
-      ::decode(snaps[i], p);
+    decode_nohead(head.num_ops, ops, p);
+    decode_nohead(head.num_snaps, snaps, p);
+    if (head.flags & CEPH_OSD_OP_PEERSTAT)
+      ::decode(peer_stat, p);
   }
 
 
   const char *get_type_name() { return "osd_op"; }
   void print(ostream& out) {
-    out << "osd_op(" << get_reqid()
-       << " " << ceph_osd_op_name(get_op())
-       << " " << head.oid;
-    if (get_length()) out << " " << get_offset() << "~" << get_length();
+    out << "osd_op(" << get_reqid();
+    out << " " << head.oid << " " << ops;
     out << " " << pg_t(head.layout.ol_pgid);
     if (is_retry_attempt()) out << " RETRY";
     if (!snaps.empty())
index a02895b99b3c60ee0086d618a16d00a424282364..3526b23f34910bdbaba1eff213d1fa4795866451 100644 (file)
 
 class MOSDOpReply : public Message {
   ceph_osd_reply_head head;
-
  public:
+  vector<ceph_osd_op> ops;
+
   long     get_tid() { return head.tid; }
   object_t get_oid() { return head.oid; }
   pg_t     get_pg() { return pg_t(head.layout.ol_pgid); }
-  int      get_op()  { return head.op; }
   int      get_flags() { return head.flags; }
   bool     is_safe() { return get_flags() & CEPH_OSD_OP_SAFE; }
   
   __s32 get_result() { return head.result; }
-  __u64 get_length() { return head.length; }
-  __u64 get_offset() { return head.offset; }
   eversion_t get_version() { return head.reassert_version; }
 
+  bool is_modify() { return head.is_modify; }
+
   void set_result(int r) { head.result = r; }
-  void set_length(loff_t s) { head.length = s; }
-  void set_offset(loff_t o) { head.offset = o; }
   void set_version(eversion_t v) { head.reassert_version = v; }
 
-  void set_op(int op) { head.op = op; }
-
   // osdmap
   epoch_t get_map_epoch() { return head.osdmap_epoch; }
 
@@ -61,14 +57,13 @@ public:
     Message(CEPH_MSG_OSD_OPREPLY) {
     memset(&head, 0, sizeof(head));
     head.tid = req->head.tid;
-    head.op = req->head.op;
+    head.is_modify = req->is_modify();
+    ops = req->ops;
+    head.result = result;
     head.flags = commit ? CEPH_OSD_OP_SAFE:0;
     head.oid = req->head.oid;
     head.layout = req->head.layout;
     head.osdmap_epoch = e;
-    head.result = result;
-    head.offset = req->head.offset;
-    head.length = req->head.length;  // speculative... OSD should ensure these are correct
     head.reassert_version = req->head.reassert_version;
   }
   MOSDOpReply() {}
@@ -78,20 +73,20 @@ public:
   virtual void decode_payload() {
     bufferlist::iterator p = payload.begin();
     ::decode(head, p);
+    ::decode_nohead(head.num_ops, ops, p);
   }
   virtual void encode_payload() {
+    head.num_ops = ops.size();
     ::encode(head, payload);
-    header.data_off = get_offset();
+    ::encode_nohead(ops, payload);
   }
 
   const char *get_type_name() { return "osd_op_reply"; }
   
   void print(ostream& out) {
     out << "osd_op_reply(" << get_tid()
-       << " " << ceph_osd_op_name(get_op())
-       << " " << head.oid;
-    if (get_length()) out << " " << get_offset() << "~" << get_length();
-    if (get_op() >= 10) {
+       << " " << head.oid << " " << ops;
+    if (is_modify()) {
       if (is_safe())
        out << " commit";
       else
index fb9a2de10775ad03de46c0f741f2fe955b93850c..eab4a9d6eea76afb20066bdb95f9488e4a029a75 100644 (file)
@@ -33,9 +33,10 @@ public:
   // subop
   pg_t pgid;
   pobject_t poid;
-  int32_t op;
-  loff_t offset, length;
   
+  bool is_modify, wants_reply;
+  vector<ceph_osd_op> ops;
+
   // subop metadata
   tid_t rep_tid;
   eversion_t version, old_version;
@@ -59,9 +60,7 @@ public:
     ::decode(reqid, p);
     ::decode(pgid, p);
     ::decode(poid, p);
-    ::decode(op, p);
-    ::decode(offset, p);
-    ::decode(length, p);
+    ::decode(ops, p);
     ::decode(rep_tid, p);
     ::decode(version, p);
     ::decode(old_version, p);
@@ -80,9 +79,7 @@ public:
     ::encode(reqid, payload);
     ::encode(pgid, payload);
     ::encode(poid, payload);
-    ::encode(op, payload);
-    ::encode(offset, payload);
-    ::encode(length, payload);
+    ::encode(ops, payload);
     ::encode(rep_tid, payload);
     ::encode(version, payload);
     ::encode(old_version, payload);
@@ -94,26 +91,22 @@ public:
     ::encode(attrset, payload);
     ::encode(data_subset, payload);
     ::encode(clone_subsets, payload);
-    header.data_off = offset;
+    if (ops.size())
+      header.data_off = ops[0].offset;
+    else
+      header.data_off = 0;
   }
 
-  bool wants_reply() {
-    if (op < 100) return true;
-    return false;  // no reply needed for primary-lock, -unlock.
-  }
 
-  bool is_read() { return op < 10; }
-  MOSDSubOp(osd_reqid_t r, pg_t p, pobject_t po, int o, loff_t of, loff_t le,
+  MOSDSubOp(osd_reqid_t r, pg_t p, pobject_t po, vector<ceph_osd_op>& o, bool wr,
            epoch_t mape, tid_t rtid, unsigned il, eversion_t v) :
     Message(MSG_OSD_SUBOP),
     map_epoch(mape),
     reqid(r),
     pgid(p),
     poid(po),
-    op(o),
-    offset(of),
-    length(le),
+    wants_reply(wr),
+    ops(o),
     rep_tid(rtid),
     version(v),
     inc_lock(il)
@@ -125,11 +118,10 @@ public:
   const char *get_type_name() { return "osd_sub_op"; }
   void print(ostream& out) {
     out << "osd_sub_op(" << reqid
-       << " " << ceph_osd_op_name(op)
        << " " << poid
+       << " " << ops
        << " v " << version
        << " snapset=" << snapset << " snapc=" << snapc;    
-    if (length) out << " " << offset << "~" << length;
     if (!data_subset.empty()) out << " subset " << data_subset;
     out << ")";
   }
index 90c3468b8c72ff432208f62ac96eeca83748afe7..6973e7c46abad1ead4859b41b0f7737e4abb1c1f 100644 (file)
@@ -36,10 +36,11 @@ class MOSDSubOpReply : public Message {
   osd_reqid_t reqid;
   pg_t pgid;
   tid_t rep_tid;
-  int32_t op;
   pobject_t poid;
-  off_t length, offset;
-  
+
+public:
+  vector<ceph_osd_op> ops;
+
   // result
   bool commit;
   int32_t result;
@@ -50,17 +51,14 @@ class MOSDSubOpReply : public Message {
 
   map<string,bufferptr> attrset;
 
- public:
   virtual void decode_payload() {
     bufferlist::iterator p = payload.begin();
     ::decode(map_epoch, p);
     ::decode(reqid, p);
     ::decode(pgid, p);
     ::decode(rep_tid, p);
-    ::decode(op, p);
     ::decode(poid, p);
-    ::decode(length, p);
-    ::decode(offset, p);
+    ::decode(ops, p);
     ::decode(commit, p);
     ::decode(result, p);
     ::decode(pg_complete_thru, p);
@@ -72,26 +70,20 @@ class MOSDSubOpReply : public Message {
     ::encode(reqid, payload);
     ::encode(pgid, payload);
     ::encode(rep_tid, payload);
-    ::encode(op, payload);
     ::encode(poid, payload);
-    ::encode(length, payload);
-    ::encode(offset, payload);
+    ::encode(ops, payload);
     ::encode(commit, payload);
     ::encode(result, payload);
     ::encode(pg_complete_thru, payload);
     ::encode(peer_stat, payload);
     ::encode(attrset, payload);
-    header.data_off = offset;
   }
 
   epoch_t get_map_epoch() { return map_epoch; }
 
   pg_t get_pg() { return pgid; }
   tid_t get_rep_tid() { return rep_tid; }
-  int get_op()  { return op; }
   pobject_t get_poid() { return poid; }
-  const off_t get_length() { return length; }
-  const off_t get_offset() { return offset; }
 
   bool get_commit() { return commit; }
   int get_result() { return result; }
@@ -112,10 +104,8 @@ public:
     reqid(req->reqid),
     pgid(req->pgid),
     rep_tid(req->rep_tid),
-    op(req->op),
     poid(req->poid),
-    length(req->length),
-    offset(req->offset),
+    ops(req->ops),
     commit(commit_),
     result(result_) {
     memset(&peer_stat, 0, sizeof(peer_stat));
@@ -126,15 +116,11 @@ public:
   
   void print(ostream& out) {
     out << "osd_sub_op_reply(" << reqid
-       << " " << ceph_osd_op_name(op)
-       << " " << poid;
-    if (length) out << " " << offset << "~" << length;
-    if (op >= 10) {
-      if (commit)
-       out << " commit";
-      else
-       out << " ack";
-    }
+       << " " << poid << " " << ops;
+    if (commit)
+      out << " commit";
+    else
+      out << " ack";
     out << " = " << result;
     out << ")";
   }
index e20abab0fe2a9ef9554bf69d2a66209c3b63e0ec..a0a3b5115ae800823b876161d9ceb0325d93d3f3 100644 (file)
@@ -3002,7 +3002,7 @@ void OSD::handle_op(MOSDOp *op)
   stat_oprate.hit(now);
   stat_ops++;
   stat_qlen += pending_ops;
-  if (op->get_op() == CEPH_OSD_OP_READ) {
+  if (!op->is_modify()) {
     stat_rd_ops++;
     if (op->get_source().is_osd()) {
       //derr(-10) << "shed in " << stat_rd_ops_shed_in << " / " << stat_rd_ops << dendl;
@@ -3036,7 +3036,7 @@ void OSD::handle_op(MOSDOp *op)
     }
 
     // pg must be same-ish...
-    if (op->is_read()) {
+    if (!op->is_modify()) {
       // read
       if (!pg->same_for_read_since(op->get_map_epoch())) {
        dout(7) << "handle_rep_op pg changed " << pg->info.history
@@ -3142,7 +3142,7 @@ void OSD::handle_op(MOSDOp *op)
     return;
   }
 
-  if (op->get_op() == CEPH_OSD_OP_READ) {
+  if (!op->is_modify()) {
     Mutex::Locker lock(peer_stat_lock);
     stat_rd_ops_in_queue++;
   }
index 6433d3cfa943a27c4d54ac123632deb5c80a8690..b6856a73fa2ca57268789a55562518f970533b6e 100644 (file)
@@ -1401,28 +1401,24 @@ void PG::trim_ondisklog_to(ObjectStore::Transaction& t, eversion_t v)
 }
 
 
-void PG::append_log(ObjectStore::Transaction &t, const PG::Log::Entry &logentry, 
-                    eversion_t trim_to)
+void PG::append_log(ObjectStore::Transaction &t, bufferlist& bl,
+                   eversion_t logversion, eversion_t trim_to)
 {
-  dout(10) << "append_log " << ondisklog.top << " " << logentry << dendl;
+  dout(10) << "append_log " << ondisklog.top << dendl;
 
-  // write entry on disk
-  bufferlist bl;
-  ::encode(logentry, bl);
-  /*
   if (g_conf.osd_pad_pg_log) {  // pad to 4k, until i fix ebofs reallocation crap.  FIXME.
-    bufferptr bp(4096 - sizeof(logentry));
+    bufferptr bp(4096 - bl.length());
     bl.push_back(bp);
   }
-  */
   t.write(0, info.pgid.to_pobject(), ondisklog.top, bl.length(), bl );
   
   // update block map?
   if (ondisklog.top % 4096 == 0) 
-    ondisklog.block_map[ondisklog.top] = logentry.version;
+    ondisklog.block_map[ondisklog.top] = logversion;
   
   ondisklog.top += bl.length();
-  t.collection_setattr(info.pgid.to_coll(), "ondisklog_top", &ondisklog.top, sizeof(ondisklog.top));
+  t.collection_setattr(info.pgid.to_coll(), "ondisklog_top",
+                      &ondisklog.top, sizeof(ondisklog.top));
   
   // trim?
   if (trim_to > log.bottom &&
index 29251e5ca4e4b93b916ab2afb13ae33d8b67a883..459888f11439be0c97a0d9d067218c6079f188ea 100644 (file)
@@ -727,9 +727,8 @@ public:
   // pg on-disk state
   void write_info(ObjectStore::Transaction& t);
   void write_log(ObjectStore::Transaction& t);
-  void append_log(ObjectStore::Transaction &t, 
-                  const PG::Log::Entry &logentry, 
-                  eversion_t trim_to);
+  void append_log(ObjectStore::Transaction &t, bufferlist& bl,
+                 eversion_t log_version, eversion_t trim_to);
   void read_log(ObjectStore *store);
   void trim_ondisklog_to(ObjectStore::Transaction& t, eversion_t v);
 
@@ -835,7 +834,8 @@ inline ostream& operator<<(ostream& out, const PG& pg)
   out << "pg[" << pg.info 
       << " r=" << pg.get_role();
 
-  if (pg.log.bottom != pg.info.log_bottom)
+  if (pg.log.bottom != pg.info.log_bottom ||
+      pg.log.top != pg.info.last_update)
     out << " (info mismatch, " << pg.log << ")";
 
   if (pg.log.log.empty()) {
index b7ad015ad6c9a3eba9d69e2c214c2266c4b53c63..1ca57352f6d4bc747a84da3f2a90378c946e1931 100644 (file)
@@ -47,6 +47,7 @@ bool RAID4PG::preprocess_op(MOSDOp *op, utime_t now)
 
 void RAID4PG::do_op(MOSDOp *op)
 {
+  /*
 
   // a write will do something like
   object_t oid = op->get_oid();   // logical object
@@ -75,7 +76,7 @@ void RAID4PG::do_op(MOSDOp *op)
     if (rank == n) rank = 0;
   }
 
-  
+  */
   
 
 }
index fe5c54b8b1db704643193bf5feff52ac8a36edda..d99663400072126bab507ceca9b69b26c6443a6d 100644 (file)
@@ -110,8 +110,11 @@ void ReplicatedPG::wait_for_missing_object(object_t oid, Message *m)
 bool ReplicatedPG::preprocess_op(MOSDOp *op, utime_t now)
 {
   // we only care about reads here on out..
-  if (!op->is_read()) 
+  if (op->is_modify() ||
+      op->ops.size() < 1 ||
+      op->ops[0].op != CEPH_OSD_OP_READ) 
     return false;
+  ceph_osd_op& readop = op->ops[0];
 
   object_t oid = op->get_oid();
   pobject_t poid(info.pgid.pool(), 0, oid);
@@ -168,11 +171,12 @@ bool ReplicatedPG::preprocess_op(MOSDOp *op, utime_t now)
        ceph_object_layout layout;
        layout.ol_pgid = info.pgid.u.pg64;
        layout.ol_stripe_unit = 0;
-       MOSDOp *pop = new MOSDOp(0, osd->get_tid(),
+       MOSDOp *pop = new MOSDOp(0, osd->get_tid(), true,
                                 oid,
                                 layout,
                                 osd->osdmap->get_epoch(),
-                                CEPH_OSD_OP_BALANCEREADS, 0);
+                                0);
+       pop->add_simple_op(CEPH_OSD_OP_BALANCEREADS, 0, 0);
        do_op(pop);
       }
       if (is_balanced && !should_balance &&
@@ -182,11 +186,12 @@ bool ReplicatedPG::preprocess_op(MOSDOp *op, utime_t now)
        ceph_object_layout layout;
        layout.ol_pgid = info.pgid.u.pg64;
        layout.ol_stripe_unit = 0;
-       MOSDOp *pop = new MOSDOp(0, osd->get_tid(),
+       MOSDOp *pop = new MOSDOp(0, osd->get_tid(), true,
                                 oid,
                                 layout,
                                 osd->osdmap->get_epoch(),
-                                CEPH_OSD_OP_UNBALANCEREADS, 0);
+                                0);
+       pop->add_simple_op(CEPH_OSD_OP_UNBALANCEREADS, 0, 0);
        do_op(pop);
       }
     }
@@ -336,8 +341,8 @@ bool ReplicatedPG::preprocess_op(MOSDOp *op, utime_t now)
   // if this is a read and the data is in the cache, do an immediate read.. 
   if ( g_conf.osd_immediate_read_from_cache ) {
     if (osd->store->is_cached(info.pgid.to_coll(), poid,
-                             op->get_offset(), 
-                             op->get_length()) == 0) {
+                             readop.offset,
+                             readop.length) == 0) {
       if (!is_primary() && !op->get_source().is_osd()) {
        // am i allowed?
        bool v;
@@ -370,10 +375,10 @@ void ReplicatedPG::do_op(MOSDOp *op)
 
   osd->logger->inc("op");
 
-  if (ceph_osd_op_is_read(op->get_op()))
-    op_read(op);
-  else
+  if (op->is_modify())
     op_modify(op);
+  else
+    op_read(op);
 }
 
 
@@ -383,7 +388,10 @@ void ReplicatedPG::do_sub_op(MOSDSubOp *op)
 
   osd->logger->inc("subop");
 
-  switch (op->op) {
+  assert(op->ops.size() >= 1);
+  ceph_osd_op& first = op->ops[0];
+
+  switch (first.op) {
     // rep stuff
   case CEPH_OSD_OP_PULL:
     sub_op_pull(op);
@@ -393,18 +401,15 @@ void ReplicatedPG::do_sub_op(MOSDSubOp *op)
     break;
 
   default:
-    if (ceph_osd_op_is_modify(op->op) ||
-       ceph_osd_op_is_lock(op->op))
-      sub_op_modify(op);
-    else
-      assert(0);
+    sub_op_modify(op);
   }
-
 }
 
 void ReplicatedPG::do_sub_op_reply(MOSDSubOpReply *r)
 {
-  if (r->get_op() == CEPH_OSD_OP_PUSH) {
+  assert(r->ops.size() >= 1);
+  ceph_osd_op& first = r->ops[0];
+  if (first.op == CEPH_OSD_OP_PUSH) {
     // continue peer recovery
     sub_op_push_reply(r);
   } else {
@@ -621,15 +626,20 @@ void ReplicatedPG::op_read(MOSDOp *op)
   object_t oid = op->get_oid();
   pobject_t poid(info.pgid.pool(), 0, oid);
 
-  dout(10) << "op_read " << ceph_osd_op_name(op->get_op())
+  ceph_osd_op& readop = op->ops[0];
+
+  dout(10) << "op_read " << ceph_osd_op_name(readop.op)
           << " " << oid 
-           << " " << op->get_offset() << "~" << op->get_length() 
+           << " " << readop.offset << "~" << readop.length
            << dendl;
   
   // wrlocked?
   if (block_if_wrlocked(op)) 
     return;
 
+  // taking read shedding out for now, because i don't want to embed
+  // the osd_peer_stat in MOSDOp
+
   // !primary and unbalanced?
   //  (ignore ops forwarded from the primary)
   if (!is_primary()) {
@@ -637,6 +647,7 @@ void ReplicatedPG::op_read(MOSDOp *op)
        op->get_source().num() == get_primary()) {
       // read was shed to me by the primary
       int from = op->get_source().num();
+      assert(op->get_flags() & CEPH_OSD_OP_PEERSTAT);
       osd->take_peer_stat(from, op->get_peer_stat());
       dout(10) << "read shed IN from " << op->get_source() 
                << " " << op->get_reqid()
@@ -671,7 +682,6 @@ void ReplicatedPG::op_read(MOSDOp *op)
       }
     }
   }
-  
 
   // set up reply
   MOSDOpReply *reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), true); 
@@ -696,23 +706,24 @@ void ReplicatedPG::op_read(MOSDOp *op)
     }
   }
   
-  switch (op->get_op()) {
+  switch (readop.op) {
   case CEPH_OSD_OP_READ:
     {
       // read into a buffer
       bufferlist bl;
       r = osd->store->read(info.pgid.to_coll(), poid, 
-                          op->get_offset(), op->get_length(),
+                          readop.offset, readop.length,
                           bl);
       reply->set_data(bl);
+      reply->get_header().data_off = readop.offset;
       if (r >= 0) 
-       reply->set_length(r);
+       reply->ops[0].length = r;
       else
-       reply->set_length(0);
-      dout(10) << " read got " << r << " / " << op->get_length() << " bytes from obj " << oid << dendl;
+       reply->ops[0].length = 0;
+      dout(10) << " read got " << r << " / " << readop.length << " bytes from obj " << oid << dendl;
     }
     osd->logger->inc("c_rd");
-    osd->logger->inc("c_rdb", op->get_length());
+    osd->logger->inc("c_rdb", reply->ops[0].length);
     break;
     
   case CEPH_OSD_OP_STAT:
@@ -721,7 +732,7 @@ void ReplicatedPG::op_read(MOSDOp *op)
       memset(&st, sizeof(st), 0);
       r = osd->store->stat(info.pgid.to_coll(), poid, &st);
       if (r >= 0)
-       reply->set_length(st.st_size);
+       reply->ops[0].length = st.st_size;
     }
     break;
     
@@ -774,119 +785,106 @@ void ReplicatedPG::_make_clone(ObjectStore::Transaction& t,
   t.setattr(info.pgid.to_coll(), coid, "from_version", &ov, sizeof(v));
 }
 
-void ReplicatedPG::prepare_transaction(ObjectStore::Transaction& t, osd_reqid_t reqid,
-                                      pobject_t poid, int op,
-                                      eversion_t old_version, eversion_t at_version,
-                                      off_t offset, off_t length, bufferlist& bl,
-                                      SnapSet& snapset, SnapContext& snapc,
-                                      __u32 inc_lock, eversion_t trim_to)
+void ReplicatedPG::prepare_clone(ObjectStore::Transaction& t, bufferlist& logbl, osd_reqid_t reqid,
+                                pobject_t poid, loff_t old_size,
+                                eversion_t old_version, eversion_t at_version, 
+                                SnapSet& snapset, SnapContext& snapc)
 {
-  // WRNOOP does nothing.
-  if (op == CEPH_OSD_OP_WRNOOP) 
-    return;
-
-  // munge zero into remove?
-  if (op == CEPH_OSD_OP_ZERO) {
-    struct stat st;
-    int r = osd->store->stat(info.pgid.to_coll(), poid, &st);
-    if (r >= 0) {
-      if (offset == 0 && offset + length >= (loff_t)st.st_size) {
-       dout(10) << " munging ZERO " << offset << "~" << length
-                << " -> DELETE (size is " << st.st_size << ")" << dendl;
-       op = CEPH_OSD_OP_DELETE;
-      }
-    }
-  }
-
   // clone?
-  if (poid.oid.snap) {
-    assert(poid.oid.snap == CEPH_NOSNAP);
-    dout(20) << "snapset=" << snapset << "  snapc=" << snapc << dendl;;
-
-    // use newer snapc?
-    if (snapset.seq > snapc.seq) {
-      snapc.seq = snapset.seq;
-      snapc.snaps = snapset.snaps;
-      dout(10) << " using newer snapc " << snapc << dendl;
+  assert(poid.oid.snap == CEPH_NOSNAP);
+  dout(20) << "snapset=" << snapset << "  snapc=" << snapc << dendl;;
+  
+  // use newer snapc?
+  if (snapset.seq > snapc.seq) {
+    snapc.seq = snapset.seq;
+    snapc.snaps = snapset.snaps;
+    dout(10) << " using newer snapc " << snapc << dendl;
+  }
+  
+  if (snapset.head_exists &&           // head exists
+      snapc.snaps.size() &&            // there are snaps
+      snapc.snaps[0] > snapset.seq) {  // existing object is old
+    // clone
+    pobject_t coid = poid;
+    coid.oid.snap = snapc.seq;
+    
+    unsigned l;
+    for (l=1; l<snapc.snaps.size() && snapc.snaps[l] > snapset.seq; l++) ;
+    
+    vector<snapid_t> snaps(l);
+    for (unsigned i=0; i<l; i++)
+      snaps[i] = snapc.snaps[i];
+    
+    bufferlist snapsbl;
+    ::encode(snaps, snapsbl);
+    
+    // log clone
+    dout(10) << "cloning to " << coid << " v " << at_version << " snaps=" << snaps << dendl;
+    Log::Entry cloneentry(PG::Log::Entry::CLONE, coid.oid, at_version, old_version, reqid);
+    cloneentry.snaps = snapsbl;
+    dout(10) << "prepare_transaction " << cloneentry << dendl;
+    log.add(cloneentry);
+    ::encode(cloneentry, logbl);
+    assert(log.top == at_version);
+    
+    // prepare clone
+    _make_clone(t, poid, coid, old_version, at_version, snapsbl);
+    
+    // add to snap bound collections
+    coll_t fc = make_snap_collection(t, snaps[0]);
+    t.collection_add(fc, info.pgid.to_coll(), coid);
+    if (snaps.size() > 1) {
+      coll_t lc = make_snap_collection(t, snaps[snaps.size()-1]);
+      t.collection_add(lc, info.pgid.to_coll(), coid);
     }
+    
+    snapset.clones.push_back(coid.oid.snap);
+    snapset.clone_size[coid.oid.snap] = old_size;
+    snapset.clone_overlap[coid.oid.snap].insert(0, old_size);
+    
+    at_version.version++;
+  }
+  
+  // update snapset with latest snap context
+  snapset.seq = snapc.seq;
+  snapset.snaps = snapc.snaps;
+}
 
-    if (snapset.head_exists &&           // head exists
-       snapc.snaps.size() &&            // there are snaps
-       snapc.snaps[0] > snapset.seq &&  // existing object is old
-       ceph_osd_op_is_modify(op)) {     // is a (non-lock) modification
-      // clone
-      pobject_t coid = poid;
-      coid.oid.snap = snapc.seq;
-
-      struct stat st;
-      osd->store->stat(info.pgid.to_coll(), poid, &st);
-      
-      unsigned l;
-      for (l=1; l<snapc.snaps.size() && snapc.snaps[l] > snapset.seq; l++) ;
-
-      vector<snapid_t> snaps(l);
-      for (unsigned i=0; i<l; i++)
-       snaps[i] = snapc.snaps[i];
-
-      bufferlist snapsbl;
-      ::encode(snaps, snapsbl);
-
-      // log clone
-      dout(10) << "cloning to " << coid << " v " << at_version << " snaps=" << snaps << dendl;
-      Log::Entry cloneentry(PG::Log::Entry::CLONE, coid.oid, at_version, old_version, reqid);
-      cloneentry.snaps = snapsbl;
-      dout(10) << "prepare_transaction " << cloneentry << dendl;
-      log.add(cloneentry);
-      assert(log.top == at_version);
-
-      // prepare clone
-      _make_clone(t, poid, coid, old_version, at_version, snapsbl);
-      
-      // add to snap bound collections
-      coll_t fc = make_snap_collection(t, snaps[0]);
-      t.collection_add(fc, info.pgid.to_coll(), coid);
-      if (snaps.size() > 1) {
-       coll_t lc = make_snap_collection(t, snaps[snaps.size()-1]);
-       t.collection_add(lc, info.pgid.to_coll(), coid);
-      }
-
-      snapset.clones.push_back(coid.oid.snap);
-      snapset.clone_size[coid.oid.snap] = st.st_size;
-      snapset.clone_overlap[coid.oid.snap].insert(0, st.st_size);
-      
-      at_version.version++;
-    }
 
-    // update snapset with latest snap context
-    snapset.seq = snapc.seq;
-    snapset.snaps = snapc.snaps;
-
-    // munge delete into a truncate?
-    if (op == CEPH_OSD_OP_DELETE &&
-       snapset.clones.size()) {
-      dout(10) << " munging DELETE -> TRUNCATE(0) bc of clones " << snapset.clones << dendl;
-      op = CEPH_OSD_OP_TRUNCATE;
-      length = 0;
-      snapset.head_exists = false;
+// low level object operations
+int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t reqid,
+                                   pobject_t poid, __u64& old_size,
+                                   ceph_osd_op& op, bufferlist::iterator& bp,
+                                   SnapSet& snapset, SnapContext& snapc)
+{
+  int eop = op.op;
+
+  // munge ZERO -> DELETE or TRUNCATE?
+  if (eop == CEPH_OSD_OP_ZERO &&
+      snapset.head_exists &&
+      op.offset + op.length >= old_size) {
+    if (op.offset == 0) {
+      dout(10) << " munging ZERO " << op.offset << "~" << op.length
+              << " -> DELETE (size is " << old_size << ")" << dendl;
+      eop = CEPH_OSD_OP_DELETE;
+    } else {
+      dout(10) << " munging ZERO " << op.offset << "~" << op.length
+              << " -> TRUNCATE (size is " << old_size << ")" << dendl;
+      eop = CEPH_OSD_OP_TRUNCATE;
     }
   }
+  // munge DELETE -> TRUNCATE?
+  if (eop == CEPH_OSD_OP_DELETE &&
+      snapset.clones.size()) {
+    dout(10) << " munging DELETE -> TRUNCATE(0) bc of clones " << snapset.clones << dendl;
+    eop = CEPH_OSD_OP_TRUNCATE;
+    op.length = 0;
+    snapset.head_exists = false;
+  }
 
-  
-  // log op
-  int opcode = Log::Entry::MODIFY;
-  if (op == CEPH_OSD_OP_DELETE) opcode = Log::Entry::DELETE;
-  Log::Entry logentry(opcode, poid.oid, at_version, old_version, reqid);
-  dout(10) << "prepare_transaction " << logentry << dendl;
-
-  assert(at_version > log.top);
-  log.add(logentry);
-  assert(log.top == at_version);
-
-  // prepare op
-  switch (op) {
+  switch (eop) {
 
     // -- locking --
-
   case CEPH_OSD_OP_WRLOCK:
     { // lock object
       t.setattr(info.pgid.to_coll(), poid, "wrlock", &reqid.name, sizeof(entity_name_t));
@@ -915,43 +913,43 @@ void ReplicatedPG::prepare_transaction(ObjectStore::Transaction& t, osd_reqid_t
 
   case CEPH_OSD_OP_WRITE:
     { // write
-      assert(bl.length() == length);
       bufferlist nbl;
-      nbl.claim(bl);    // give buffers to store; we keep *op in memory for a long time!
-      t.write(info.pgid.to_coll(), poid, offset, length, nbl);
+      bp.copy(op.length, nbl);
+      t.write(info.pgid.to_coll(), poid, op.offset, op.length, nbl);
       snapset.head_exists = true;
       if (snapset.clones.size()) {
        snapid_t newest = *snapset.clones.rbegin();
        interval_set<__u64> ch;
-       ch.insert(offset, length);
+       ch.insert(op.offset, op.length);
        ch.intersection_of(snapset.clone_overlap[newest]);
        snapset.clone_overlap[newest].subtract(ch);
       }
+      old_size = MAX(old_size, op.offset + op.length);
     }
     break;
     
   case CEPH_OSD_OP_WRITEFULL:
     { // write full object
-      assert(bl.length() == length);
       bufferlist nbl;
-      nbl.claim(bl);    // give buffers to store; we keep *op in memory for a long time!
+      bp.copy(op.length, nbl);
       t.truncate(info.pgid.to_coll(), poid, 0);
-      t.write(info.pgid.to_coll(), poid, offset, length, nbl);
+      t.write(info.pgid.to_coll(), poid, op.offset, op.length, nbl);
       snapset.head_exists = true;
       if (snapset.clones.size()) {
        snapid_t newest = *snapset.clones.rbegin();
        snapset.clone_overlap.erase(newest);
       }
+      old_size = op.length;
     }
     break;
     
   case CEPH_OSD_OP_ZERO:
     { // zero
-      t.zero(info.pgid.to_coll(), poid, offset, length);
+      t.zero(info.pgid.to_coll(), poid, op.offset, op.length);
       if (snapset.clones.size()) {
        snapid_t newest = *snapset.clones.rbegin();
        interval_set<__u64> ch;
-       ch.insert(offset, length);
+       ch.insert(op.offset, op.length);
        ch.intersection_of(snapset.clone_overlap[newest]);
        snapset.clone_overlap[newest].subtract(ch);
       }
@@ -960,14 +958,15 @@ void ReplicatedPG::prepare_transaction(ObjectStore::Transaction& t, osd_reqid_t
 
   case CEPH_OSD_OP_TRUNCATE:
     { // truncate
-      t.truncate(info.pgid.to_coll(), poid, length);
+      t.truncate(info.pgid.to_coll(), poid, op.length);
       if (snapset.clones.size()) {
        snapid_t newest = *snapset.clones.rbegin();
        interval_set<__u64> keep;
-       if (length)
-         keep.insert(0, length);
+       if (op.length)
+         keep.insert(0, op.length);
        snapset.clone_overlap[newest].intersection_of(keep);
       }
+      old_size = op.length;
     }
     break;
     
@@ -978,16 +977,52 @@ void ReplicatedPG::prepare_transaction(ObjectStore::Transaction& t, osd_reqid_t
        snapid_t newest = *snapset.clones.rbegin();
        snapset.clone_overlap.erase(newest);  // ok, redundant.
       }
+      old_size = 0;
     }
     break;
     
   default:
-    assert(0);
+    return -EINVAL;
   }
+
+  return 0;
+}
+
+void ReplicatedPG::prepare_transaction(ObjectStore::Transaction& t, osd_reqid_t reqid,
+                                      pobject_t poid,
+                                      vector<ceph_osd_op>& ops, bufferlist& bl,
+                                      eversion_t old_version, eversion_t at_version,
+                                      SnapSet& snapset, SnapContext& snapc,
+                                      __u32 inc_lock, eversion_t trim_to)
+{
+  bufferlist log_bl;
+  eversion_t log_version = at_version;
+  assert(!ops.empty());
   
-  // object collection, version
-  if (op != CEPH_OSD_OP_DELETE) {
-    if (inc_lock && ceph_osd_op_is_modify(op)) 
+  // stat existing
+  struct stat st;
+  int r = osd->store->stat(info.pgid.to_coll(), poid, &st);
+  __u64 old_size = 0;
+  if (r == 0)
+    old_size = st.st_size;
+
+  // apply ops
+  bool did_snap = false;
+  bufferlist::iterator bp = bl.begin();
+  for (unsigned i=0; i<ops.size(); i++) {
+    // clone?
+    if (!did_snap && poid.oid.snap &&
+       ceph_osd_op_is_modify(ops[i].op)) {     // is a (non-lock) modification
+      prepare_clone(t, log_bl, reqid, poid, old_size, old_version, at_version, snapset, snapc);
+      did_snap = true;
+    }
+    prepare_simple_op(t, reqid, poid, old_size, ops[i], bp,
+                     snapset, snapc);
+  }
+
+  // finish.
+  if (!old_size >= 0) {
+    if (inc_lock)
       t.setattr(info.pgid.to_coll(), poid, "inc_lock", &inc_lock, sizeof(inc_lock));
 
     t.setattr(info.pgid.to_coll(), poid, "version", &at_version, sizeof(at_version));
@@ -1005,11 +1040,19 @@ void ReplicatedPG::prepare_transaction(ObjectStore::Transaction& t, osd_reqid_t
   // raise last_update.
   assert(at_version > info.last_update);
   info.last_update = at_version;
-  
-  write_info(t);
 
-  // prepare log append
-  append_log(t, logentry, trim_to);
+  // log mutation
+  int logopcode = Log::Entry::MODIFY;
+  if (!snapset.head_exists)
+    logopcode = Log::Entry::DELETE;
+  Log::Entry logentry(logopcode, poid.oid, at_version, old_version, reqid);
+  dout(10) << "prepare_transaction " << logentry << dendl;
+  log.add(logentry);
+  ::encode(logentry, log_bl);
+
+  // write pg info, log to disk
+  write_info(t);
+  append_log(t, log_bl, log_version, trim_to);
 }
 
 
@@ -1058,8 +1101,9 @@ void ReplicatedPG::apply_repop(RepGather *repop)
 
   // any completion stuff to do here?
   object_t oid = repop->op->get_oid();
+  ceph_osd_op& first = repop->op->ops[0];
 
-  switch (repop->op->get_op()) { 
+  switch (first.op) { 
   case CEPH_OSD_OP_UNBALANCEREADS:
     dout(-10) << "apply_repop  completed unbalance-reads on " << oid << dendl;
     unbalancing_reads.erase(oid);
@@ -1166,8 +1210,7 @@ void ReplicatedPG::issue_repop(RepGather *repop, int dest, utime_t now)
   
   // forward the write/update/whatever
   MOSDSubOp *wr = new MOSDSubOp(repop->op->get_reqid(), info.pgid, poid,
-                               repop->op->get_op(), 
-                               repop->op->get_offset(), repop->op->get_length(), 
+                               repop->op->ops, true,
                                osd->osdmap->get_epoch(), 
                                repop->rep_tid, repop->op->get_inc_lock(), repop->at_version);
   wr->old_version = repop->old_version;
@@ -1344,23 +1387,14 @@ void ReplicatedPG::op_modify(MOSDOp *op)
   int whoami = osd->get_nodeid();
   pobject_t poid(info.pgid.pool(), 0, op->get_oid());
 
-  const char *opname = ceph_osd_op_name(op->get_op());
-
-  // make sure it looks ok
-  if ((op->get_op() == CEPH_OSD_OP_WRITE ||
-       op->get_op() == CEPH_OSD_OP_WRITEFULL) &&
-      op->get_length() != op->get_data().length()) {
-    dout(0) << "op_modify got bad write, claimed length " << op->get_length() 
-           << " != payload length " << op->get_data().length()
-           << dendl;
-    delete op;
-    return;
-  }
+  const char *opname = "no-op";
+  if (op->ops.size())
+    opname = ceph_osd_op_name(op->ops[0].op);
 
   // --- locking ---
 
   // wrlock?
-  if (op->get_op() != CEPH_OSD_OP_WRNOOP &&  // except WRNOOP; we just want to flush
+  if (!op->ops.empty() &&  // except WRNOOP; we just want to flush
       block_if_wrlocked(op)) 
     return; // op will be handled later, after the object unlocks
   
@@ -1377,6 +1411,7 @@ void ReplicatedPG::op_modify(MOSDOp *op)
   }
 
   // balance-reads set?
+#if 0
   char v;
   if ((op->get_op() != CEPH_OSD_OP_BALANCEREADS && op->get_op() != CEPH_OSD_OP_UNBALANCEREADS) &&
       (osd->store->getattr(info.pgid.to_coll(), poid, "balance-reads", &v, 1) >= 0 ||
@@ -1403,19 +1438,20 @@ void ReplicatedPG::op_modify(MOSDOp *op)
     waiting_for_unbalanced_reads[poid.oid].push_back(op);
     return;
   }
+#endif
 
   // dup op?
   if (is_dup(op->get_reqid())) {
     dout(3) << "op_modify " << opname << " dup op " << op->get_reqid()
              << ", doing WRNOOP" << dendl;
-    op->set_op(CEPH_OSD_OP_WRNOOP);
-    opname = ceph_osd_op_name(op->get_op());
+    op->ops.clear();
+    opname = "no-op";
   }
 
 
   // version
   eversion_t av = log.top;
-  if (op->get_op() != CEPH_OSD_OP_WRNOOP) {
+  if (op->ops.size()) {
     av.epoch = osd->osdmap->get_epoch();
     av.version++;
     assert(av > info.last_update);
@@ -1448,7 +1484,6 @@ void ReplicatedPG::op_modify(MOSDOp *op)
 
   dout(10) << "op_modify " << opname 
            << " " << poid.oid 
-           << " " << op->get_offset() << "~" << op->get_length()
            << " ov " << old_version << " av " << av 
           << " snapc " << snapc
           << " snapset " << snapset
@@ -1474,15 +1509,17 @@ void ReplicatedPG::op_modify(MOSDOp *op)
     }
   }
 
-  if (op->get_op() == CEPH_OSD_OP_WRITE ||
-      op->get_op() == CEPH_OSD_OP_WRITEFULL) {
+  if (op->get_data().length()) {
     osd->logger->inc("c_wr");
-    osd->logger->inc("c_wrb", op->get_length());
+    osd->logger->inc("c_wrb", op->get_data().length());
   }
 
   // note my stats
   utime_t now = g_clock.now();
 
+  // permute operation?
+  // ...
+
   // issue replica writes
   tid_t rep_tid = osd->get_tid();
   RepGather *repop = new_rep_gather(op, rep_tid, av, snapset, snapc);
@@ -1490,11 +1527,10 @@ void ReplicatedPG::op_modify(MOSDOp *op)
     issue_repop(repop, acting[i], now);
 
   // we are acker.
-  if (op->get_op() != CEPH_OSD_OP_WRNOOP) {
+  if (op->ops.size()) {
     // log and update later.
-    prepare_transaction(repop->t, op->get_reqid(), poid, op->get_op(),
+    prepare_transaction(repop->t, op->get_reqid(), poid, op->ops, op->get_data(),
                        old_version, av,
-                       op->get_offset(), op->get_length(), op->get_data(),
                        snapset, snapc,
                        op->get_inc_lock(), peers_complete_thru);
   }
@@ -1516,12 +1552,13 @@ void ReplicatedPG::op_modify(MOSDOp *op)
 void ReplicatedPG::sub_op_modify(MOSDSubOp *op)
 {
   pobject_t poid = op->poid;
-  const char *opname = ceph_osd_op_name(op->op);
+  const char *opname = "no-op";
+  if (op->ops.size())
+    ceph_osd_op_name(op->ops[0].op);
   
   dout(10) << "sub_op_modify " << opname 
            << " " << poid 
            << " v " << op->version
-          << " " << op->offset << "~" << op->length
           << dendl;  
 
   // sanity checks
@@ -1551,12 +1588,12 @@ void ReplicatedPG::sub_op_modify(MOSDSubOp *op)
   // do op
   int ackerosd = acting[0];
   osd->logger->inc("r_wr");
-  osd->logger->inc("r_wrb", op->length);
+  osd->logger->inc("r_wrb", op->get_data().length());
   
-  if (op->op != CEPH_OSD_OP_WRNOOP) {
+  if (op->ops.size()) {
     prepare_transaction(t, op->reqid,
-                       op->poid, op->op, op->old_version, op->version,
-                       op->offset, op->length, op->get_data(), 
+                       op->poid, op->ops, op->get_data(),
+                       op->old_version, op->version,
                        op->snapset, op->snapc,
                        op->inc_lock, op->pg_trim_to);
   }
@@ -1800,9 +1837,9 @@ bool ReplicatedPG::pull(pobject_t poid)
   // send op
   osd_reqid_t rid;
   tid_t tid = osd->get_tid();
-  
-  MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, poid, CEPH_OSD_OP_PULL,
-                                  0, 0, 
+  vector<ceph_osd_op> pull(1);
+  pull[0].op = CEPH_OSD_OP_PULL;
+  MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, poid, pull, true,
                                   osd->osdmap->get_epoch(), tid, 0, v);
   subop->data_subset.swap(data_subset);
   // do not include clone_subsets in pull request; we will recalculate this
@@ -1853,7 +1890,11 @@ void ReplicatedPG::push_to_replica(pobject_t poid, int peer)
       osd->store->getattrs(info.pgid.to_coll(), poid, attrset);
       
       osd_reqid_t rid;  // useless?
-      MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, poid, CEPH_OSD_OP_PUSH, 0, st.st_size,
+      vector<ceph_osd_op> push(1);
+      push[0].op = CEPH_OSD_OP_PUSH;
+      push[0].offset = 0;
+      push[0].length = st.st_size;
+      MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, poid, push, false,
                                       osd->osdmap->get_epoch(), osd->get_tid(), 0, version);
       subop->data_subset.insert(0, st.st_size);
       subop->attrset.swap(attrset);
@@ -1948,7 +1989,11 @@ void ReplicatedPG::push(pobject_t poid, int peer,
   
   // send
   osd_reqid_t rid;  // useless?
-  MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, poid, CEPH_OSD_OP_PUSH, 0, size,
+  vector<ceph_osd_op> push(1);
+  push[0].op = CEPH_OSD_OP_PUSH;
+  push[0].offset = 0;
+  push[0].length = size;
+  MOSDSubOp *subop = new MOSDSubOp(rid, info.pgid, poid, push, false,
                                   osd->osdmap->get_epoch(), osd->get_tid(), 0, v);
   subop->data_subset.swap(data_subset);
   subop->clone_subsets.swap(clone_subsets);
@@ -2025,11 +2070,12 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op)
 {
   pobject_t poid = op->poid;
   eversion_t v = op->version;
+  ceph_osd_op& push = op->ops[0];
 
   dout(7) << "op_push " 
           << poid 
           << " v " << v 
-         << " len " << op->length
+         << " len " << push.length
          << " subset " << op->data_subset
          << " data " << op->get_data().length()
           << dendl;
@@ -2066,8 +2112,8 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op)
 
   // determine data/clone subsets
   data_subset = op->data_subset;
-  if (data_subset.empty() && op->length && op->length == op->get_data().length())
-    data_subset.insert(0, op->length);
+  if (data_subset.empty() && push.length && push.length == op->get_data().length())
+    data_subset.insert(0, push.length);
   clone_subsets = op->clone_subsets;
 
   if (is_primary()) {
index a7f471704d43af88db88edb105ed2c2ce1c67491..e41e954e505b13ca40359a047d0a46f7eaf8a8fe 100644 (file)
@@ -116,10 +116,18 @@ protected:
   void _make_clone(ObjectStore::Transaction& t,
                   pobject_t head, pobject_t coid,
                   eversion_t ov, eversion_t v, bufferlist& snaps);
+  void prepare_clone(ObjectStore::Transaction& t, bufferlist& logbl, osd_reqid_t reqid,
+                    pobject_t poid, loff_t old_size,
+                    eversion_t old_version, eversion_t at_version,
+                    SnapSet& snapset, SnapContext& snapc);
+  int prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t reqid,
+                       pobject_t poid, __u64& old_size,
+                       ceph_osd_op& op, bufferlist::iterator& bp,
+                       SnapSet& snapset, SnapContext& snapc); 
   void prepare_transaction(ObjectStore::Transaction& t, osd_reqid_t reqid,
-                          pobject_t poid, int op,
+                          pobject_t poid, 
+                          vector<ceph_osd_op>& ops, bufferlist& bl,
                           eversion_t old_version, eversion_t at_version,
-                          off_t offset, off_t length, bufferlist& bl,
                           SnapSet& snapset, SnapContext& snapc,
                           __u32 inc_lock, eversion_t trim_to);
   
index e51ea859e8fbe636a682700dbfc743150bfaaf4d..18d8c2c30969b30f41660154245ee53f59b61667 100644 (file)
@@ -368,7 +368,17 @@ struct pg_stat_t {
 };
 WRITE_CLASS_ENCODER(pg_stat_t)
 
-typedef struct ceph_osd_peer_stat osd_peer_stat_t;
+struct osd_peer_stat_t {
+       struct ceph_timespec stamp;
+       float oprate;
+       float qlen;
+       float recent_qlen;
+       float read_latency;
+       float read_latency_mine;
+       float frac_rd_ops_shed_in;
+       float frac_rd_ops_shed_out;
+} __attribute__ ((packed));
+
 WRITE_RAW_ENCODER(osd_peer_stat_t)
 
 inline ostream& operator<<(ostream& out, const osd_peer_stat_t &stat) {
@@ -461,6 +471,9 @@ inline ostream& operator<<(ostream& out, OSDSuperblock& sb)
 WRITE_CLASS_ENCODER(interval_set<__u64>)
 
 
+
+
+
 /*
  * attached to object head.  describes most recent snap context, and
  * set of existing clones.
index 611a328ed0992ab46a092b4ae79593ae49f0893e..59359121f572761d90ccf066228b2bbb7db4b2e5 100644 (file)
@@ -338,8 +338,11 @@ void Objecter::tick()
 
 void Objecter::handle_osd_op_reply(MOSDOpReply *m)
 {
+  assert(m->ops.size() >= 1);
+  int op = m->ops[0].op;
+
   // read or modify?
-  switch (m->get_op()) {
+  switch (op) {
   case CEPH_OSD_OP_READ:
     handle_osd_read_reply(m);
     break;
@@ -348,22 +351,10 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
     handle_osd_stat_reply(m);
     break;
     
-  case CEPH_OSD_OP_WRNOOP:
-  case CEPH_OSD_OP_WRITE:
-  case CEPH_OSD_OP_WRITEFULL:
-  case CEPH_OSD_OP_ZERO:
-  case CEPH_OSD_OP_DELETE:
-  case CEPH_OSD_OP_WRUNLOCK:
-  case CEPH_OSD_OP_WRLOCK:
-  case CEPH_OSD_OP_RDLOCK:
-  case CEPH_OSD_OP_RDUNLOCK:
-  case CEPH_OSD_OP_UPLOCK:
-  case CEPH_OSD_OP_DNLOCK:
+  default:
+    assert(m->is_modify());
     handle_osd_modify_reply(m);
     break;
-
-  default:
-    assert(0);
   }
 }
 
@@ -409,9 +400,10 @@ tid_t Objecter::stat_submit(OSDStat *st)
     int flags = st->flags;
     if (st->onfinish) flags |= CEPH_OSD_OP_ACK;
 
-    MOSDOp *m = new MOSDOp(client_inc, last_tid,
+    MOSDOp *m = new MOSDOp(client_inc, last_tid, false,
                           ex.oid, ex.layout, osdmap->get_epoch(), 
-                          CEPH_OSD_OP_STAT, flags);
+                          flags);
+    m->stat();
     if (inc_lock > 0) {
       st->inc_lock = inc_lock;
       m->set_inc_lock(inc_lock);
@@ -434,10 +426,12 @@ void Objecter::handle_osd_stat_reply(MOSDOpReply *m)
     return;
   }
 
+  ceph_osd_op& op = m->ops[0];
+
   dout(7) << "handle_osd_stat_reply " << tid 
-                 << " r=" << m->get_result()
-                 << " size=" << m->get_length()
-                 << dendl;
+         << " r=" << m->get_result()
+         << " size=" << op.length
+         << dendl;
   OSDStat *st = op_stat[ tid ];
   op_stat.erase( tid );
 
@@ -466,7 +460,7 @@ void Objecter::handle_osd_stat_reply(MOSDOpReply *m)
   if (m->get_result() < 0) {
     *st->size = 0;
   } else {
-    *st->size = m->get_length();
+    *st->size = op.length;
   }
 
   // finish, clean up
@@ -537,15 +531,14 @@ tid_t Objecter::readx_submit(OSDRead *rd, ObjectExtent &ex, bool retry)
   if (pg.acker() >= 0) {
     int flags = rd->flags;
     if (rd->onfinish) flags |= CEPH_OSD_OP_ACK;
-    MOSDOp *m = new MOSDOp(client_inc, last_tid,
+    MOSDOp *m = new MOSDOp(client_inc, last_tid, false,
                           ex.oid, ex.layout, osdmap->get_epoch(), 
-                          CEPH_OSD_OP_READ, flags);
+                          flags);
+    m->read(ex.start, ex.length);
     if (inc_lock > 0) {
       rd->inc_lock = inc_lock;
       m->set_inc_lock(inc_lock);
     }
-    m->set_length(ex.length);
-    m->set_offset(ex.start);
     m->set_retry_attempt(retry);
     
     int who = pg.acker();
@@ -577,6 +570,8 @@ void Objecter::handle_osd_read_reply(MOSDOpReply *m)
   dout(7) << "handle_osd_read_reply " << tid << dendl;
   OSDRead *rd = op_read[ tid ];
   op_read.erase( tid );
+  
+  ceph_osd_op& op = m->ops[0];
 
   // remove from osd/tid maps
   PG& pg = get_pg( m->get_pg() );
@@ -611,7 +606,7 @@ void Objecter::handle_osd_read_reply(MOSDOpReply *m)
 
   // what buffer offset are we?
   dout(7) << " got frag from " << m->get_oid() << " "
-          << m->get_offset() << "~" << m->get_length()
+          << op.offset << "~" << op.length
           << ", still have " << rd->ops.size() << " more ops" << dendl;
   
   if (rd->ops.empty()) {
@@ -859,17 +854,16 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid)
            << " osd" << pg.primary()
            << dendl;
   if (pg.primary() >= 0) {
-    MOSDOp *m = new MOSDOp(client_inc, tid,
+    MOSDOp *m = new MOSDOp(client_inc, tid, true,
                           ex.oid, ex.layout, osdmap->get_epoch(),
-                          wr->op, flags);
+                          flags);
+    m->add_simple_op(wr->op, ex.start, ex.length);
     m->set_snap_seq(wr->snapc.seq);
     m->get_snaps() = wr->snapc.snaps;
     if (inc_lock > 0) {
       wr->inc_lock = inc_lock;
       m->set_inc_lock(inc_lock);
     }
-    m->set_length(ex.length);
-    m->set_offset(ex.start);
     if (usetid > 0)
       m->set_retry_attempt(true);
     
index 9166b534ddba224958650826ddc45cb21226009e..67f508d45a74d5bcf91c30754db2c596dee14ab8 100755 (executable)
@@ -1,7 +1,7 @@
 #!/bin/bash
 
 [ "$CEPH_NUM_MON" == "" ] && CEPH_NUM_MON=3
-[ "$CEPH_NUM_OSD" == "" ] && CEPH_NUM_OSD=4
+[ "$CEPH_NUM_OSD" == "" ] && CEPH_NUM_OSD=1
 [ "$CEPH_NUM_MDS" == "" ] && CEPH_NUM_MDS=1
 
 let new=0