#define CEPH_OSD_OP_TYPE_LOCK 0x0100
#define CEPH_OSD_OP_TYPE_DATA 0x0200
#define CEPH_OSD_OP_TYPE_ATTR 0x0300
+#define CEPH_OSD_OP_TYPE_EXEC 0x0400
enum {
/** data **/
CEPH_OSD_OP_DNLOCK = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 6,
/** exec **/
- CEPH_OSD_OP_LOAD_CLASS = CEPH_OSD_OP_MODE_EXEC | 1,
- CEPH_OSD_OP_EXEC = CEPH_OSD_OP_MODE_EXEC | 2,
+ CEPH_OSD_OP_RDCALL = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_EXEC | 1,
+ CEPH_OSD_OP_LOAD_CLASS = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_EXEC | 2,
+ CEPH_OSD_OP_WRCALL = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_EXEC | 1,
};
static inline int ceph_osd_op_type_lock(int op)
{
return (op & CEPH_OSD_OP_TYPE) == CEPH_OSD_OP_TYPE_ATTR;
}
+static inline int ceph_osd_op_type_exec(int op)
+{
+ return (op & CEPH_OSD_OP_TYPE) == CEPH_OSD_OP_TYPE_EXEC;
+}
static inline int ceph_osd_op_mode_subop(int op)
{
case CEPH_OSD_OP_UPLOCK: return "uplock";
case CEPH_OSD_OP_DNLOCK: return "dnlock";
- case CEPH_OSD_OP_EXEC: return "exec";
+ case CEPH_OSD_OP_RDCALL: return "rdcall";
+ case CEPH_OSD_OP_WRCALL: return "wrcall";
default: return "???";
}
__le64 truncate_size;
__le32 truncate_seq;
};
+ struct {
+ __u8 class_len;
+ __u8 method_len;
+ __le32 indata_len;
+ } __attribute__ ((packed));
};
} __attribute__ ((packed));
int RadosClient::exec(object_t& oid, const char *code, off_t data_off, size_t data_len, char *buf, size_t out_len)
{
SnapContext snapc;
- bufferlist bl, obl;
+ bufferlist bl;
utime_t ut = g_clock.now();
Mutex lock("RadosClient::exec");
lock.Lock();
- objecter->exec(oid, layout,
- data_off, data_len, CEPH_NOSNAP, bl, 0,
- &obl, out_len,
- oncommit);
+ ObjectRead rd;
+ bufferlist inbl, outbl;
+ inbl.append("input data", 10);
+ rd.rdcall("test", "foo", inbl);
+ objecter->read(oid, layout, rd, CEPH_NOSNAP, &outbl, 0, oncommit);
- dout(0) << "after write call" << dendl;
+ dout(0) << "after rdcall got " << outbl.length() << " bytes" << dendl;
exec_wait.Wait(lock);
lock.Unlock();
if (out_len)
- memcpy(buf, obl.c_str(), out_len);
+ memcpy(buf, outbl.c_str(), out_len);
return out_len;
}
osd->logger->inc(l_osd_c_rdb, p->length);
break;
- case CEPH_OSD_OP_EXEC:
+ case CEPH_OSD_OP_RDCALL:
{
- dout(0) << "CEPH_OSD_OP_EXEC" << dendl;
+ dout(0) << "CEPH_OSD_OP_RDCALL" << dendl;
- if (!osd->get_class("test", info.pgid, op))
+ string cname, method;
+ bp.copy(p->class_len, cname);
+ bp.copy(p->method_len, method);
+
+ bufferlist indata;
+ ::decode_nohead(p->indata_len, indata, bp);
+
+ if (!osd->get_class(cname, info.pgid, op))
return;
#if 0
ops[s].name_len = namelen;
ops[s].value_len = valuelen;
}
+ void add_call(int op, const char *cname, const char *method, bufferlist &indata) {
+ int s = ops.size();
+ ops.resize(s+1);
+ memset(&ops[s], 0, sizeof(ops[s]));
+ ops[s].op = op;
+ ops[s].class_len = strlen(cname);
+ ops[s].method_len = strlen(method);
+ data.append(cname, ops[s].class_len);
+ data.append(method, ops[s].method_len);
+ data.append(indata);
+ }
};
struct ObjectRead : public ObjectOperation {
void getxattrs() {
add_xattr(CEPH_OSD_OP_GETXATTRS, 0, 0);
}
+
+ void rdcall(const char *cname, const char *method, bufferlist &indata) {
+ add_call(CEPH_OSD_OP_RDCALL, cname, method, indata);
+ }
};
struct ObjectMutation : public ObjectOperation {
utime_t mtime;
+ // exec
+ void wrcall(const char *cname, const char *method, bufferlist &indata) {
+ add_call(CEPH_OSD_OP_WRCALL, cname, method, indata);
+ }
+
// object data
void write(__u64 off, __u64 len, bufferlist& bl) {
add_data(CEPH_OSD_OP_WRITE, off, len);
return read_submit(rd);
}
- tid_t exec(object_t oid, ceph_object_layout ol,
- __u64 data_off, size_t data_len,
- snapid_t snap, bufferlist &bl, int flags,
- bufferlist *pbl, size_t out_len,
- Context *onfinish) {
- vector<ceph_osd_op> ops(1);
- memset(&ops[0], 0, sizeof(ops[0]));
- ops[0].op = CEPH_OSD_OP_EXEC;
- ops[0].offset = data_off;
- ops[0].length = data_len;
- ReadOp *rd = new ReadOp(oid, ol, ops, snap, flags, onfinish);
- rd->bl = bl;
- rd->pbl = pbl;
- return read_submit(rd);
- }
-
tid_t modify(object_t oid, ceph_object_layout ol, vector<ceph_osd_op>& ops,
const SnapContext& snapc, bufferlist &bl, utime_t mtime, int flags,
Context *onack, Context *oncommit) {