]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: object attr operations
authorSage Weil <sage@newdream.net>
Mon, 10 Nov 2008 04:11:57 +0000 (20:11 -0800)
committerSage Weil <sage@newdream.net>
Mon, 10 Nov 2008 04:11:57 +0000 (20:11 -0800)
src/include/buffer.h
src/include/ceph_fs.h
src/include/cstring.h
src/include/types.h
src/os/ObjectStore.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h
src/osd/osd_types.h
src/osdc/Objecter.h

index 588c5e3a087dbf0b4561288d5959dae3f40845e0..5e23627cd38c594a5918fe1416aa76e2358de98c 100644 (file)
@@ -786,6 +786,9 @@ public:
        append_buffer.set_length(0);   // unused, so far.
       }
     }
+    void append(const string& s) {
+      append(s.data(), s.length());
+    }
     void append(const ptr& bp) {
       if (bp.length())
        push_back(bp);
index e9527f053c5b123b725f12fb249a9dad6a9f9fed..e38f56edc86355f80f006a7062dbc38e311cd2ac 100644 (file)
  * whenever the wire protocol changes.  try to keep this string length
  * constant.
  */
-#define CEPH_BANNER "ceph 004\n"
+#define CEPH_BANNER "ceph 005\n"
 #define CEPH_BANNER_MAX_LEN 30
 
 /*
  * subprotocol versions.  when specific messages types or high-level
  * protocols change, bump the affected components.
  */
-#define CEPH_OSD_PROTOCOL    1
+#define CEPH_OSD_PROTOCOL    2
 #define CEPH_MDS_PROTOCOL    2
 #define CEPH_MON_PROTOCOL    2
 #define CEPH_CLIENT_PROTOCOL 1
@@ -1036,54 +1036,76 @@ struct ceph_mds_snap_realm {
 /*
  * osd ops
  */
+#define CEPH_OSD_OP_MODE       0xf00
+#define CEPH_OSD_OP_MODE_RD    0x100
+#define CEPH_OSD_OP_MODE_WR    0x200
+#define CEPH_OSD_OP_MODE_SUB   0x400
+
+#define CEPH_OSD_OP_TYPE       0x0f0
+#define CEPH_OSD_OP_TYPE_LOCK  0x300
+#define CEPH_OSD_OP_TYPE_DATA  0x010
+#define CEPH_OSD_OP_TYPE_ATTR  0x020
+
 enum {
        /* read */
-       CEPH_OSD_OP_READ       = 1,
-       CEPH_OSD_OP_STAT       = 2,
-       CEPH_OSD_OP_GETXATTR   = 3,
-       CEPH_OSD_OP_GETXATTRS  = 4,
-
-       /* modify */
-       CEPH_OSD_OP_WRNOOP     = 10, /* write no-op (i.e. sync) */
-       CEPH_OSD_OP_WRITE      = 11, /* write extent */
-       CEPH_OSD_OP_DELETE     = 12, /* delete object */
-       CEPH_OSD_OP_TRUNCATE   = 13,
-       CEPH_OSD_OP_ZERO       = 14, /* zero extent */
-       CEPH_OSD_OP_WRITEFULL  = 15, /* write complete object */
-       CEPH_OSD_OP_SETXATTR   = 16,
-       CEPH_OSD_OP_SETXATTRS  = 17,
-       CEPH_OSD_OP_RMXATTR    = 18,
+       CEPH_OSD_OP_READ       = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 1,
+       CEPH_OSD_OP_STAT       = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 2,
 
-       /* lock */
-       CEPH_OSD_OP_WRLOCK     = 20,
-       CEPH_OSD_OP_WRUNLOCK   = 21,
-       CEPH_OSD_OP_RDLOCK     = 22,
-       CEPH_OSD_OP_RDUNLOCK   = 23,
-       CEPH_OSD_OP_UPLOCK     = 24,
-       CEPH_OSD_OP_DNLOCK     = 25,
+       CEPH_OSD_OP_GETXATTR   = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_ATTR | 1,
+       CEPH_OSD_OP_GETXATTRS  = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_ATTR | 2,
 
        /* subop */
-       CEPH_OSD_OP_PULL       = 30,
-       CEPH_OSD_OP_PUSH       = 31,
-       CEPH_OSD_OP_BALANCEREADS   = 40,
-       CEPH_OSD_OP_UNBALANCEREADS = 41
+       CEPH_OSD_OP_PULL           = CEPH_OSD_OP_MODE_SUB | 1,
+       CEPH_OSD_OP_PUSH           = CEPH_OSD_OP_MODE_SUB | 2,
+       CEPH_OSD_OP_BALANCEREADS   = CEPH_OSD_OP_MODE_SUB | 3,
+       CEPH_OSD_OP_UNBALANCEREADS = CEPH_OSD_OP_MODE_SUB | 4,
+
+       /* object data */
+       CEPH_OSD_OP_WRITE      = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 1,
+       CEPH_OSD_OP_WRITEFULL  = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 2,
+       CEPH_OSD_OP_TRUNCATE   = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 3,
+       CEPH_OSD_OP_ZERO       = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 4,
+       CEPH_OSD_OP_DELETE     = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 5,
+
+       /* object attrs */
+       CEPH_OSD_OP_SETXATTR   = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_ATTR | 1,
+       CEPH_OSD_OP_SETXATTRS  = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_ATTR | 2,
+       CEPH_OSD_OP_RESETXATTRS= CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_ATTR | 3,
+       CEPH_OSD_OP_RMXATTR    = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_ATTR | 4,
+
+       /* lock */
+       CEPH_OSD_OP_WRLOCK     = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 1,
+       CEPH_OSD_OP_WRUNLOCK   = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 2,
+       CEPH_OSD_OP_RDLOCK     = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 3,
+       CEPH_OSD_OP_RDUNLOCK   = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 4,
+       CEPH_OSD_OP_UPLOCK     = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 5,
+       CEPH_OSD_OP_DNLOCK     = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 6,
 };
 
-static inline int ceph_osd_op_is_read(int op)
+static inline int ceph_osd_op_type_lock(int op)
 {
-       return op < 10;
+       return (op & CEPH_OSD_OP_TYPE) == CEPH_OSD_OP_TYPE_LOCK;
 }
-static inline int ceph_osd_op_is_modify(int op)
+static inline int ceph_osd_op_type_data(int op)
 {
-       return op >= 10 && op < 20;
+       return (op & CEPH_OSD_OP_TYPE) == CEPH_OSD_OP_TYPE_DATA;
 }
-static inline int ceph_osd_op_is_lock(int op)
+static inline int ceph_osd_op_type_attr(int op)
 {
-       return op >= 20 && op < 30;
+       return (op & CEPH_OSD_OP_TYPE) == CEPH_OSD_OP_TYPE_ATTR;
 }
-static inline int ceph_osd_op_is_subop(int op)
+
+static inline int ceph_osd_op_mode_subop(int op)
 {
-       return op >= 30 && op < 40;
+       return (op & CEPH_OSD_OP_MODE) == CEPH_OSD_OP_MODE_SUB;
+}
+static inline int ceph_osd_op_mode_read(int op)
+{
+       return (op & CEPH_OSD_OP_MODE) == CEPH_OSD_OP_MODE_RD;
+}
+static inline int ceph_osd_op_mode_modify(int op)
+{
+       return (op & CEPH_OSD_OP_MODE) == CEPH_OSD_OP_MODE_WR;
 }
 
 static inline const char *ceph_osd_op_name(int op)
@@ -1092,13 +1114,17 @@ static inline const char *ceph_osd_op_name(int op)
        case CEPH_OSD_OP_READ: return "read";
        case CEPH_OSD_OP_STAT: return "stat";
 
-       case CEPH_OSD_OP_WRNOOP: return "wrnoop";
        case CEPH_OSD_OP_WRITE: return "write";
        case CEPH_OSD_OP_DELETE: return "delete";
        case CEPH_OSD_OP_TRUNCATE: return "truncate";
        case CEPH_OSD_OP_ZERO: return "zero";
        case CEPH_OSD_OP_WRITEFULL: return "writefull";
 
+       case CEPH_OSD_OP_SETXATTR: return "setxattr";
+       case CEPH_OSD_OP_SETXATTRS: return "setxattrs";
+       case CEPH_OSD_OP_RESETXATTRS: return "resetxattrs";
+       case CEPH_OSD_OP_RMXATTR: return "rmxattr";
+
        case CEPH_OSD_OP_WRLOCK: return "wrlock";
        case CEPH_OSD_OP_WRUNLOCK: return "wrunlock";
        case CEPH_OSD_OP_RDLOCK: return "rdlock";
index 89bed6344c153e76d983c6b3e62538edbd0d03a7..7254e92ad058cdd693c4241320d8166718f98300 100644 (file)
@@ -47,8 +47,8 @@ class cstring {
   // accessors
   int length() const { return _len; }
   bool empty() const { return _len == 0; }
-  const char *c_str() const { return _data; }
-  const char *data() const { return _data; }
+  char *c_str() const { return _data; }
+  char *data() const { return _data; }
 
   //const char *operator() const { return _data; }
 
@@ -77,6 +77,10 @@ class cstring {
     _data[_len] = 0;
     return *this;
   }
+  char &operator[](int n) {
+    assert(n < _len);
+    return _data[n];
+  }
   void swap(cstring &other) {
     int tlen = _len;
     char *tdata = _data;
index aa99e564375f990e28cbf7b2d6d12cdeb22ba74e..7a7c463b6e43c850eef6108667ba1ab9618f53b1 100644 (file)
@@ -376,7 +376,10 @@ inline ostream& operator<<(ostream& out, const ceph_fsid& f) {
 
 inline ostream& operator<<(ostream& out, const ceph_osd_op& op) {
   out << ceph_osd_op_name(op.op);
-  out << " " << op.offset << "~" << op.length;
+  if (ceph_osd_op_type_data(op.op))
+    out << " " << op.offset << "~" << op.length;
+  else if (ceph_osd_op_type_attr(op.op))
+    out << " " << op.name_len << "+" << op.value_len;
   return out;
 }
 
index 8cb7111033bb9fe35d278ae8e3850c37d0f8bd20..01cde7fb65faebe4c0a2b89db184957f9a3f89f5 100644 (file)
@@ -20,6 +20,7 @@
 #include "include/Context.h"
 #include "include/buffer.h"
 #include "include/pobject.h"
+#include "include/nstring.h"
 
 #include "include/Distribution.h"
 
@@ -108,7 +109,7 @@ public:
     // for these guys, just use a pointer.
     // but, decode to a full value, and create pointers to that.
     vector<const char*> attrnames;
-    vector<string> attrnames2;
+    vector<nstring> attrnames2;
     vector<map<string,bufferptr> *> attrsets;
     vector<map<string,bufferptr> > attrsets2;
 
@@ -205,6 +206,11 @@ public:
       bl.append((char*)val, len);
       setattr(cid, oid, name, bl);
     }
+    void setattr(coll_t cid, pobject_t oid, nstring& s, bufferlist& val) {
+      attrnames2.push_back(nstring());
+      attrnames2.back().swap(s);
+      setattr(cid, oid, attrnames2.back().c_str(), val);
+    }
     void setattr(coll_t cid, pobject_t oid, const char* name, bufferlist& val) {
       int op = OP_SETATTR;
       ops.push_back(op);
@@ -224,6 +230,11 @@ public:
       len++;
       blen += 5 + attrset.size();     // HACK allowance for removing old attrs
     }
+    void rmattr(coll_t cid, pobject_t oid, nstring& s) {
+      attrnames2.push_back(nstring());
+      attrnames2.back().swap(s);
+      rmattr(cid, oid, attrnames2.back().c_str());
+    }
     void rmattr(coll_t cid, pobject_t oid, const char* name) {
       int op = OP_RMATTR;
       ops.push_back(op);
@@ -348,7 +359,7 @@ public:
       ::decode(cids, bl);
       ::decode(lengths, bl);
       ::decode(attrnames2, bl);
-      for (vector<string>::iterator p = attrnames2.begin();
+      for (vector<nstring>::iterator p = attrnames2.begin();
           p != attrnames2.end();
           ++p)
        attrnames.push_back((*p).c_str());
index d99663400072126bab507ceca9b69b26c6443a6d..c2528c638f533c2d171872061273bd6b6b917590 100644 (file)
@@ -775,6 +775,8 @@ void ReplicatedPG::op_read(MOSDOp *op)
 // ========================================================================
 // MODIFY
 
+
+
 void ReplicatedPG::_make_clone(ObjectStore::Transaction& t,
                               pobject_t head, pobject_t coid,
                               eversion_t ov, eversion_t v, bufferlist& snapsbl)
@@ -909,7 +911,7 @@ int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t req
     break;
 
 
-    // -- modify --
+    // -- object data --
 
   case CEPH_OSD_OP_WRITE:
     { // write
@@ -981,6 +983,29 @@ int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t req
     }
     break;
     
+
+    // -- object data --
+  case CEPH_OSD_OP_SETXATTR:
+    {
+      nstring name(op.name_len + 1);
+      name[0] = '_';
+      bp.copy(op.name_len, name.data()+1);
+      bufferlist bl;
+      bp.copy(op.value_len, bl);
+      t.setattr(info.pgid.to_coll(), poid, name, bl);
+    }
+    break;
+
+  case CEPH_OSD_OP_RMXATTR:
+    {
+      nstring name(op.name_len + 1);
+      name[0] = '_';
+      bp.copy(op.name_len, name.data()+1);
+      t.rmattr(info.pgid.to_coll(), poid, name);
+    }
+    break;
+    
+
   default:
     return -EINVAL;
   }
@@ -1012,7 +1037,7 @@ void ReplicatedPG::prepare_transaction(ObjectStore::Transaction& t, osd_reqid_t
   for (unsigned i=0; i<ops.size(); i++) {
     // clone?
     if (!did_snap && poid.oid.snap &&
-       ceph_osd_op_is_modify(ops[i].op)) {     // is a (non-lock) modification
+       !ceph_osd_op_type_lock(ops[i].op)) {     // is a (non-lock) modification
       prepare_clone(t, log_bl, reqid, poid, old_size, old_version, at_version, snapset, snapc);
       did_snap = true;
     }
@@ -1517,9 +1542,6 @@ void ReplicatedPG::op_modify(MOSDOp *op)
   // note my stats
   utime_t now = g_clock.now();
 
-  // permute operation?
-  // ...
-
   // issue replica writes
   tid_t rep_tid = osd->get_tid();
   RepGather *repop = new_rep_gather(op, rep_tid, av, snapset, snapc);
index e41e954e505b13ca40359a047d0a46f7eaf8a8fe..1a480f16f4312df679a3f47394381a9bc82d2aaf 100644 (file)
@@ -109,6 +109,7 @@ protected:
            map<pobject_t, interval_set<__u64> >& clone_subsets);
   bool pull(pobject_t oid);
 
+
   // modify
   void op_modify_commit(tid_t rep_tid, eversion_t pg_complete_thru);
   void sub_op_modify_commit(MOSDSubOp *op, int ackerosd, eversion_t last_complete);
index 04896ac55e557be6cd1cfea00ac447c08cd0aa41..87d382d4460246a93ed4e58cc8077413ff5ca6a1 100644 (file)
@@ -417,6 +417,83 @@ inline ostream& operator<<(ostream& out, ObjectExtent &ex)
 
 
 
+// -----------------------------------------
+
+
+struct ObjectMutation {
+  vector<ceph_osd_op> ops;
+  bufferlist data;
+  
+  // object data
+  void add_data(int op, __u64 off, __u64 len) {
+    int s = ops.size();
+    ops.resize(s+1);
+    memset(&ops[s], 0, sizeof(ops[s]));
+    ops[s].op = op;
+    ops[s].offset = off;
+    ops[s].length = len;
+  }
+
+  void write(__u64 off, __u64 len, bufferlist& bl) {
+    add_data(CEPH_OSD_OP_WRITE, off, len);
+    data.claim_append(bl);
+  }
+  void write_full(bufferlist& bl) {
+    add_data(CEPH_OSD_OP_WRITEFULL, 0, bl.length());
+    data.claim_append(bl);
+  }
+  void zero(__u64 off, __u64 len) {
+    add_data(CEPH_OSD_OP_ZERO, off, len);
+  }
+  void remove() {
+    add_data(CEPH_OSD_OP_DELETE, 0, 0);
+  }
+
+  // object attrs
+  void add_xattr(int op, int namelen, int valuelen) {
+    int s = ops.size();
+    ops.resize(s+1);
+    memset(&ops[s], 0, sizeof(ops[s]));
+    ops[s].op = op;
+    ops[s].name_len = namelen;
+    ops[s].value_len = valuelen;
+  }
+  void setxattr(const char *name, const bufferlist& bl) {
+    int l = strlen(name);
+    add_xattr(CEPH_OSD_OP_SETXATTR, l, bl.length());
+    data.append(name, l);
+    data.append(bl);
+  }
+  void setxattr(const char *name, const string& s) {
+    int l = strlen(name);
+    add_xattr(CEPH_OSD_OP_SETXATTR, l, s.length());
+    data.append(name, l);
+    data.append(s);
+  }
+  void rmxattr(const char *name) {
+    int l = strlen(name);
+    add_xattr(CEPH_OSD_OP_RMXATTR, l, 0);
+    data.append(name, l);
+  }
+  void setxattrs(map<string, bufferlist>& attrs) {
+    bufferlist bl;
+    ::encode(attrs, bl);
+    add_xattr(CEPH_OSD_OP_RESETXATTRS, 0, bl.length());
+    data.claim_append(bl);
+  }
+  void resetxattrs(const char *prefix, map<string, bufferlist>& attrs) {
+    int l = strlen(prefix);
+    bufferlist bl;
+    ::encode(attrs, bl);
+    add_xattr(CEPH_OSD_OP_RESETXATTRS, l, bl.length());
+    data.append(prefix, l);
+    data.claim_append(bl);
+  }
+};
+
+
+
+
 // ---------------------------------------
 
 class OSDSuperblock {
index 3cd0d453065bd15eae95767a2564a4c89bdb6b5d..85addf8edcb3bd1f8fc39e083db20d9ff7c16dc4 100644 (file)
@@ -235,6 +235,12 @@ class Objecter {
     return read(oid, ol, ops, pbl, 0, flags, onfinish);
   }
 
+  tid_t mutate(object_t oid, ceph_object_layout ol, 
+              ObjectMutation& mutation,
+              const SnapContext& snapc, int flags,
+              Context *onack, Context *oncommit) {
+    return modify(oid, ol, mutation.ops, snapc, mutation.data, flags, onack, oncommit);
+  }
   tid_t write(object_t oid, ceph_object_layout ol,
              __u64 off, size_t len, const SnapContext& snapc, bufferlist &bl, int flags,
               Context *onack, Context *oncommit) {