append_buffer.set_length(0); // unused, so far.
}
}
+ void append(const string& s) {
+ append(s.data(), s.length());
+ }
void append(const ptr& bp) {
if (bp.length())
push_back(bp);
* whenever the wire protocol changes. try to keep this string length
* constant.
*/
-#define CEPH_BANNER "ceph 004\n"
+#define CEPH_BANNER "ceph 005\n"
#define CEPH_BANNER_MAX_LEN 30
/*
* subprotocol versions. when specific messages types or high-level
* protocols change, bump the affected components.
*/
-#define CEPH_OSD_PROTOCOL 1
+#define CEPH_OSD_PROTOCOL 2
#define CEPH_MDS_PROTOCOL 2
#define CEPH_MON_PROTOCOL 2
#define CEPH_CLIENT_PROTOCOL 1
/*
* osd ops
*/
+#define CEPH_OSD_OP_MODE 0xf00
+#define CEPH_OSD_OP_MODE_RD 0x100
+#define CEPH_OSD_OP_MODE_WR 0x200
+#define CEPH_OSD_OP_MODE_SUB 0x400
+
+#define CEPH_OSD_OP_TYPE 0x0f0
+#define CEPH_OSD_OP_TYPE_LOCK 0x300
+#define CEPH_OSD_OP_TYPE_DATA 0x010
+#define CEPH_OSD_OP_TYPE_ATTR 0x020
+
enum {
/* read */
- CEPH_OSD_OP_READ = 1,
- CEPH_OSD_OP_STAT = 2,
- CEPH_OSD_OP_GETXATTR = 3,
- CEPH_OSD_OP_GETXATTRS = 4,
-
- /* modify */
- CEPH_OSD_OP_WRNOOP = 10, /* write no-op (i.e. sync) */
- CEPH_OSD_OP_WRITE = 11, /* write extent */
- CEPH_OSD_OP_DELETE = 12, /* delete object */
- CEPH_OSD_OP_TRUNCATE = 13,
- CEPH_OSD_OP_ZERO = 14, /* zero extent */
- CEPH_OSD_OP_WRITEFULL = 15, /* write complete object */
- CEPH_OSD_OP_SETXATTR = 16,
- CEPH_OSD_OP_SETXATTRS = 17,
- CEPH_OSD_OP_RMXATTR = 18,
+ CEPH_OSD_OP_READ = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 1,
+ CEPH_OSD_OP_STAT = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 2,
- /* lock */
- CEPH_OSD_OP_WRLOCK = 20,
- CEPH_OSD_OP_WRUNLOCK = 21,
- CEPH_OSD_OP_RDLOCK = 22,
- CEPH_OSD_OP_RDUNLOCK = 23,
- CEPH_OSD_OP_UPLOCK = 24,
- CEPH_OSD_OP_DNLOCK = 25,
+ CEPH_OSD_OP_GETXATTR = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_ATTR | 1,
+ CEPH_OSD_OP_GETXATTRS = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_ATTR | 2,
/* subop */
- CEPH_OSD_OP_PULL = 30,
- CEPH_OSD_OP_PUSH = 31,
- CEPH_OSD_OP_BALANCEREADS = 40,
- CEPH_OSD_OP_UNBALANCEREADS = 41
+ CEPH_OSD_OP_PULL = CEPH_OSD_OP_MODE_SUB | 1,
+ CEPH_OSD_OP_PUSH = CEPH_OSD_OP_MODE_SUB | 2,
+ CEPH_OSD_OP_BALANCEREADS = CEPH_OSD_OP_MODE_SUB | 3,
+ CEPH_OSD_OP_UNBALANCEREADS = CEPH_OSD_OP_MODE_SUB | 4,
+
+ /* object data */
+ CEPH_OSD_OP_WRITE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 1,
+ CEPH_OSD_OP_WRITEFULL = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 2,
+ CEPH_OSD_OP_TRUNCATE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 3,
+ CEPH_OSD_OP_ZERO = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 4,
+ CEPH_OSD_OP_DELETE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 5,
+
+ /* object attrs */
+ CEPH_OSD_OP_SETXATTR = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_ATTR | 1,
+ CEPH_OSD_OP_SETXATTRS = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_ATTR | 2,
+ CEPH_OSD_OP_RESETXATTRS= CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_ATTR | 3,
+ CEPH_OSD_OP_RMXATTR = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_ATTR | 4,
+
+ /* lock */
+ CEPH_OSD_OP_WRLOCK = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 1,
+ CEPH_OSD_OP_WRUNLOCK = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 2,
+ CEPH_OSD_OP_RDLOCK = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 3,
+ CEPH_OSD_OP_RDUNLOCK = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 4,
+ CEPH_OSD_OP_UPLOCK = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 5,
+ CEPH_OSD_OP_DNLOCK = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 6,
};
-static inline int ceph_osd_op_is_read(int op)
+static inline int ceph_osd_op_type_lock(int op)
{
- return op < 10;
+ return (op & CEPH_OSD_OP_TYPE) == CEPH_OSD_OP_TYPE_LOCK;
}
-static inline int ceph_osd_op_is_modify(int op)
+static inline int ceph_osd_op_type_data(int op)
{
- return op >= 10 && op < 20;
+ return (op & CEPH_OSD_OP_TYPE) == CEPH_OSD_OP_TYPE_DATA;
}
-static inline int ceph_osd_op_is_lock(int op)
+static inline int ceph_osd_op_type_attr(int op)
{
- return op >= 20 && op < 30;
+ return (op & CEPH_OSD_OP_TYPE) == CEPH_OSD_OP_TYPE_ATTR;
}
-static inline int ceph_osd_op_is_subop(int op)
+
+static inline int ceph_osd_op_mode_subop(int op)
{
- return op >= 30 && op < 40;
+ return (op & CEPH_OSD_OP_MODE) == CEPH_OSD_OP_MODE_SUB;
+}
+static inline int ceph_osd_op_mode_read(int op)
+{
+ return (op & CEPH_OSD_OP_MODE) == CEPH_OSD_OP_MODE_RD;
+}
+static inline int ceph_osd_op_mode_modify(int op)
+{
+ return (op & CEPH_OSD_OP_MODE) == CEPH_OSD_OP_MODE_WR;
}
static inline const char *ceph_osd_op_name(int op)
case CEPH_OSD_OP_READ: return "read";
case CEPH_OSD_OP_STAT: return "stat";
- case CEPH_OSD_OP_WRNOOP: return "wrnoop";
case CEPH_OSD_OP_WRITE: return "write";
case CEPH_OSD_OP_DELETE: return "delete";
case CEPH_OSD_OP_TRUNCATE: return "truncate";
case CEPH_OSD_OP_ZERO: return "zero";
case CEPH_OSD_OP_WRITEFULL: return "writefull";
+ case CEPH_OSD_OP_SETXATTR: return "setxattr";
+ case CEPH_OSD_OP_SETXATTRS: return "setxattrs";
+ case CEPH_OSD_OP_RESETXATTRS: return "resetxattrs";
+ case CEPH_OSD_OP_RMXATTR: return "rmxattr";
+
case CEPH_OSD_OP_WRLOCK: return "wrlock";
case CEPH_OSD_OP_WRUNLOCK: return "wrunlock";
case CEPH_OSD_OP_RDLOCK: return "rdlock";
// accessors
int length() const { return _len; }
bool empty() const { return _len == 0; }
- const char *c_str() const { return _data; }
- const char *data() const { return _data; }
+ char *c_str() const { return _data; }
+ char *data() const { return _data; }
//const char *operator() const { return _data; }
_data[_len] = 0;
return *this;
}
+ char &operator[](int n) {
+ assert(n < _len);
+ return _data[n];
+ }
void swap(cstring &other) {
int tlen = _len;
char *tdata = _data;
inline ostream& operator<<(ostream& out, const ceph_osd_op& op) {
out << ceph_osd_op_name(op.op);
- out << " " << op.offset << "~" << op.length;
+ if (ceph_osd_op_type_data(op.op))
+ out << " " << op.offset << "~" << op.length;
+ else if (ceph_osd_op_type_attr(op.op))
+ out << " " << op.name_len << "+" << op.value_len;
return out;
}
#include "include/Context.h"
#include "include/buffer.h"
#include "include/pobject.h"
+#include "include/nstring.h"
#include "include/Distribution.h"
// for these guys, just use a pointer.
// but, decode to a full value, and create pointers to that.
vector<const char*> attrnames;
- vector<string> attrnames2;
+ vector<nstring> attrnames2;
vector<map<string,bufferptr> *> attrsets;
vector<map<string,bufferptr> > attrsets2;
bl.append((char*)val, len);
setattr(cid, oid, name, bl);
}
+ void setattr(coll_t cid, pobject_t oid, nstring& s, bufferlist& val) {
+ attrnames2.push_back(nstring());
+ attrnames2.back().swap(s);
+ setattr(cid, oid, attrnames2.back().c_str(), val);
+ }
void setattr(coll_t cid, pobject_t oid, const char* name, bufferlist& val) {
int op = OP_SETATTR;
ops.push_back(op);
len++;
blen += 5 + attrset.size(); // HACK allowance for removing old attrs
}
+ void rmattr(coll_t cid, pobject_t oid, nstring& s) {
+ attrnames2.push_back(nstring());
+ attrnames2.back().swap(s);
+ rmattr(cid, oid, attrnames2.back().c_str());
+ }
void rmattr(coll_t cid, pobject_t oid, const char* name) {
int op = OP_RMATTR;
ops.push_back(op);
::decode(cids, bl);
::decode(lengths, bl);
::decode(attrnames2, bl);
- for (vector<string>::iterator p = attrnames2.begin();
+ for (vector<nstring>::iterator p = attrnames2.begin();
p != attrnames2.end();
++p)
attrnames.push_back((*p).c_str());
// ========================================================================
// MODIFY
+
+
void ReplicatedPG::_make_clone(ObjectStore::Transaction& t,
pobject_t head, pobject_t coid,
eversion_t ov, eversion_t v, bufferlist& snapsbl)
break;
- // -- modify --
+ // -- object data --
case CEPH_OSD_OP_WRITE:
{ // write
}
break;
+
+ // -- object data --
+ case CEPH_OSD_OP_SETXATTR:
+ {
+ nstring name(op.name_len + 1);
+ name[0] = '_';
+ bp.copy(op.name_len, name.data()+1);
+ bufferlist bl;
+ bp.copy(op.value_len, bl);
+ t.setattr(info.pgid.to_coll(), poid, name, bl);
+ }
+ break;
+
+ case CEPH_OSD_OP_RMXATTR:
+ {
+ nstring name(op.name_len + 1);
+ name[0] = '_';
+ bp.copy(op.name_len, name.data()+1);
+ t.rmattr(info.pgid.to_coll(), poid, name);
+ }
+ break;
+
+
default:
return -EINVAL;
}
for (unsigned i=0; i<ops.size(); i++) {
// clone?
if (!did_snap && poid.oid.snap &&
- ceph_osd_op_is_modify(ops[i].op)) { // is a (non-lock) modification
+ !ceph_osd_op_type_lock(ops[i].op)) { // is a (non-lock) modification
prepare_clone(t, log_bl, reqid, poid, old_size, old_version, at_version, snapset, snapc);
did_snap = true;
}
// note my stats
utime_t now = g_clock.now();
- // permute operation?
- // ...
-
// issue replica writes
tid_t rep_tid = osd->get_tid();
RepGather *repop = new_rep_gather(op, rep_tid, av, snapset, snapc);
map<pobject_t, interval_set<__u64> >& clone_subsets);
bool pull(pobject_t oid);
+
// modify
void op_modify_commit(tid_t rep_tid, eversion_t pg_complete_thru);
void sub_op_modify_commit(MOSDSubOp *op, int ackerosd, eversion_t last_complete);
+// -----------------------------------------
+
+
+struct ObjectMutation {
+ vector<ceph_osd_op> ops;
+ bufferlist data;
+
+ // object data
+ void add_data(int op, __u64 off, __u64 len) {
+ int s = ops.size();
+ ops.resize(s+1);
+ memset(&ops[s], 0, sizeof(ops[s]));
+ ops[s].op = op;
+ ops[s].offset = off;
+ ops[s].length = len;
+ }
+
+ void write(__u64 off, __u64 len, bufferlist& bl) {
+ add_data(CEPH_OSD_OP_WRITE, off, len);
+ data.claim_append(bl);
+ }
+ void write_full(bufferlist& bl) {
+ add_data(CEPH_OSD_OP_WRITEFULL, 0, bl.length());
+ data.claim_append(bl);
+ }
+ void zero(__u64 off, __u64 len) {
+ add_data(CEPH_OSD_OP_ZERO, off, len);
+ }
+ void remove() {
+ add_data(CEPH_OSD_OP_DELETE, 0, 0);
+ }
+
+ // object attrs
+ void add_xattr(int op, int namelen, int valuelen) {
+ int s = ops.size();
+ ops.resize(s+1);
+ memset(&ops[s], 0, sizeof(ops[s]));
+ ops[s].op = op;
+ ops[s].name_len = namelen;
+ ops[s].value_len = valuelen;
+ }
+ void setxattr(const char *name, const bufferlist& bl) {
+ int l = strlen(name);
+ add_xattr(CEPH_OSD_OP_SETXATTR, l, bl.length());
+ data.append(name, l);
+ data.append(bl);
+ }
+ void setxattr(const char *name, const string& s) {
+ int l = strlen(name);
+ add_xattr(CEPH_OSD_OP_SETXATTR, l, s.length());
+ data.append(name, l);
+ data.append(s);
+ }
+ void rmxattr(const char *name) {
+ int l = strlen(name);
+ add_xattr(CEPH_OSD_OP_RMXATTR, l, 0);
+ data.append(name, l);
+ }
+ void setxattrs(map<string, bufferlist>& attrs) {
+ bufferlist bl;
+ ::encode(attrs, bl);
+ add_xattr(CEPH_OSD_OP_RESETXATTRS, 0, bl.length());
+ data.claim_append(bl);
+ }
+ void resetxattrs(const char *prefix, map<string, bufferlist>& attrs) {
+ int l = strlen(prefix);
+ bufferlist bl;
+ ::encode(attrs, bl);
+ add_xattr(CEPH_OSD_OP_RESETXATTRS, l, bl.length());
+ data.append(prefix, l);
+ data.claim_append(bl);
+ }
+};
+
+
+
+
// ---------------------------------------
class OSDSuperblock {
return read(oid, ol, ops, pbl, 0, flags, onfinish);
}
+ tid_t mutate(object_t oid, ceph_object_layout ol,
+ ObjectMutation& mutation,
+ const SnapContext& snapc, int flags,
+ Context *onack, Context *oncommit) {
+ return modify(oid, ol, mutation.ops, snapc, mutation.data, flags, onack, oncommit);
+ }
tid_t write(object_t oid, ceph_object_layout ol,
__u64 off, size_t len, const SnapContext& snapc, bufferlist &bl, int flags,
Context *onack, Context *oncommit) {