From 123540a13d0ee473bf25a9ec4c3aec4411e639af Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Tue, 5 May 2009 22:45:00 -0700 Subject: [PATCH] c3: implement exec poc --- src/include/rados.h | 4 +++ src/osd/ReplicatedPG.cc | 59 ++++++++++++++++++----------------------- src/osdc/Objecter.h | 25 ++++++++++------- src/s3/c3.cc | 44 ++++++++++++------------------ 4 files changed, 62 insertions(+), 70 deletions(-) diff --git a/src/include/rados.h b/src/include/rados.h index c3c7a4e32507d..18073d1977289 100644 --- a/src/include/rados.h +++ b/src/include/rados.h @@ -162,6 +162,7 @@ struct ceph_eversion { #define CEPH_OSD_OP_MODE_RD 0x1000 #define CEPH_OSD_OP_MODE_WR 0x2000 #define CEPH_OSD_OP_MODE_SUB 0x4000 +#define CEPH_OSD_OP_MODE_EXEC 0x8000 #define CEPH_OSD_OP_TYPE 0x0f00 #define CEPH_OSD_OP_TYPE_LOCK 0x0100 @@ -216,6 +217,9 @@ enum { CEPH_OSD_OP_RDUNLOCK = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 4, CEPH_OSD_OP_UPLOCK = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 5, CEPH_OSD_OP_DNLOCK = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 6, + + /** exec **/ + CEPH_OSD_OP_EXEC = CEPH_OSD_OP_MODE_EXEC | 1, }; static inline int ceph_osd_op_type_lock(int op) diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 34f515529619c..107064930cf6c 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -738,7 +738,32 @@ void ReplicatedPG::op_read(MOSDOp *op) osd->logger->inc(l_osd_c_rd); osd->logger->inc(l_osd_c_rdb, p->length); break; - + + case CEPH_OSD_OP_EXEC: + { + bufferlist bl; + int r = osd->store->read(info.pgid.to_coll(), poid, p->offset, p->length, bl); + + if (data.length() == 0) + data_off = p->offset; + + if (r >= 0) { + p->length = r; + char *buf = bl.c_str(); + for (unsigned int i=0; ilength = 0; + } + dout(10) << " exec got " << r << " / " << p->length << " bytes from obj " << oid << dendl; + dout(10) << " exec reply=" << data.c_str() << dendl; + } + break; + case CEPH_OSD_OP_STAT: { struct stat st; @@ -1078,38 +1103,6 @@ int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t req } break; - case CEPH_OSD_OP_EXEC: - { // write full object - // read into a buffer - bufferlist bl; - int r = osd->store->read(info.pgid.to_coll(), poid, op.offset, op.length, bl); -#if 0 - if (data.length() == 0) - data_off = p->offset; - data.claim(bl); -#endif - if (r >= 0) { - op.length = r; - - cerr << bl.c_str() << std::endl; -#if 0 - bufferlist nbl, bl; - - if (data.length() == 0) - data_off = p->offset; - data.claim(bl); -#endif - } else { -#if 0 - result = r; - p->length = 0; -#endif - } - dout(10) << " exec got " << r << " / " << op.length << " bytes from obj " /* << poid */ << dendl; - break; - } - break; - case CEPH_OSD_OP_ZERO: { // zero assert(op.length); diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index 2990c4284391a..ac078c851c1f0 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -318,6 +318,21 @@ class Objecter { return read_submit(rd); } + tid_t exec(object_t oid, ceph_object_layout ol, + __u64 data_off, size_t data_len, bufferlist &bl, int flags, + bufferlist *pbl, size_t out_len, + Context *onfinish) { + vector ops(1); + memset(&ops[0], 0, sizeof(ops[0])); + ops[0].op = CEPH_OSD_OP_EXEC; + ops[0].offset = data_off; + ops[0].length = data_len; + ReadOp *rd = new ReadOp(oid, ol, ops, flags, onfinish); + rd->bl = bl; + rd->pbl = pbl; + return read_submit(rd); + } + tid_t modify(object_t oid, ceph_object_layout ol, vector& ops, const SnapContext& snapc, bufferlist &bl, utime_t mtime, int flags, Context *onack, Context *oncommit) { @@ -378,16 +393,6 @@ class Objecter { ops[0].length = bl.length(); return modify(oid, ol, ops, snapc, bl, mtime, flags, onack, oncommit); } - tid_t exec(object_t oid, ceph_object_layout ol, - __u64 off, size_t len, const SnapContext& snapc, bufferlist &bl, utime_t mtime, int flags, - Context *onack, Context *oncommit) { - vector ops(1); - memset(&ops[0], 0, sizeof(ops[0])); - ops[0].op = CEPH_OSD_OP_EXEC; - ops[0].offset = off; - ops[0].length = len; - return modify(oid, ol, ops, snapc, bl, mtime, flags, onack, oncommit); - } tid_t zero(object_t oid, ceph_object_layout ol, __u64 off, size_t len, const SnapContext& snapc, utime_t mtime, int flags, Context *onack, Context *oncommit) { diff --git a/src/s3/c3.cc b/src/s3/c3.cc index 546b530dde691..248dffde67b2c 100644 --- a/src/s3/c3.cc +++ b/src/s3/c3.cc @@ -97,21 +97,6 @@ class C3 : public Dispatcher } } }; - class C_ExecAck : public Context { - object_t oid; - loff_t start; - size_t *length; - Cond *pcond; - public: - tid_t tid; - C_ExecAck(object_t o, loff_t s, size_t *l, Cond *cond) : oid(o), start(s), length(l), pcond(cond) {} - void finish(int r) { - if (pcond) { - *length = r; - pcond->Signal(); - } - } - }; class C_ExecCommit : public Context { object_t oid; loff_t start; @@ -150,7 +135,7 @@ public: bool init(); int write(object_t& oid, const char *buf, off_t off, size_t len); - int exec(object_t& oid, const char *buf, off_t off, size_t len); + int exec(object_t& oid, const char *code, off_t data_off, size_t data_len, char *buf, size_t out_len); int read(object_t& oid, char *buf, off_t off, size_t len); }; @@ -302,18 +287,17 @@ int C3::write(object_t& oid, const char *buf, off_t off, size_t len) return len; } -int C3::exec(object_t& oid, const char *buf, off_t off, size_t len) +int C3::exec(object_t& oid, const char *code, off_t data_off, size_t data_len, char *buf, size_t out_len) { SnapContext snapc; - bufferlist bl; + bufferlist bl, obl; utime_t ut = g_clock.now(); Mutex lock("C3::exec"); Cond exec_wait; - bl.append(&buf[off], len); + bl.append(code, strlen(code) + 1); - C_ExecAck *onack = new C_ExecAck(oid, off, &len, &exec_wait); - C_ExecCommit *oncommit = new C_ExecCommit(oid, off, &len, NULL); + C_ExecCommit *oncommit = new C_ExecCommit(oid, data_off, &out_len, &exec_wait); ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, 0); @@ -322,8 +306,9 @@ int C3::exec(object_t& oid, const char *buf, off_t off, size_t len) lock.Lock(); objecter->exec(oid, layout, - off, len, snapc, bl, ut, 0, - onack, oncommit); + data_off, data_len, bl, 0, + &obl, out_len, + oncommit); dout(0) << "after write call" << dendl; @@ -331,7 +316,10 @@ int C3::exec(object_t& oid, const char *buf, off_t off, size_t len) lock.Unlock(); - return len; + if (out_len) + memcpy(buf, obl.c_str(), out_len); + + return out_len; } int C3::read(object_t& oid, char *buf, off_t off, size_t len) @@ -431,9 +419,9 @@ extern "C" int c3_read(object_t *oid, char *buf, off_t off, size_t len) return c3p->read(*oid, buf, off, len); } -extern "C" int c3_exec(object_t *oid, char *buf, off_t off, size_t len) +extern "C" int c3_exec(object_t *oid, const char *code, off_t off, size_t len, char *buf, size_t out_len) { - return c3p->exec(*oid, buf, off, len); + return c3p->exec(*oid, code, off, len, buf, out_len); } @@ -453,9 +441,11 @@ int main(int argc, const char **argv) object_t oid(0x2010, 0); c3_write(&oid, buf, 0, strlen(buf) + 1); + c3_exec(&oid, "code", 0, 128, buf, 128); + cerr << "exec result=" << buf << std::endl; size_t size = c3_read(&oid, buf2, 0, 128); - cerr << "result=" << buf2 << "" << std::endl; + cerr << "read result=" << buf2 << "" << std::endl; cerr << "size=" << size << std::endl; -- 2.39.5