Mutex lock;
Cond cond;
+
public:
RadosClient() : messenger(NULL), mc(NULL), lock("radosclient") {}
~RadosClient();
int remove(int pool, object_t& oid);
int exec(int pool, object_t& oid, const char *cls, const char *method, bufferlist& inbl, bufferlist& outbl);
+
+
+ // --- aio ---
+ struct AioCompletion {
+ Mutex lock;
+ Cond cond;
+ int ref, rval;
+ bool ack, safe;
+
+ // for read
+ bufferlist bl, *pbl;
+ char *buf;
+ unsigned maxlen;
+
+ AioCompletion() : lock("RadosClient::AioCompletion"),
+ ref(1), rval(0), ack(false), safe(false), pbl(0), buf(0), maxlen(0) {}
+
+ int wait_for_complete() {
+ lock.Lock();
+ while (!ack)
+ cond.Wait(lock);
+ lock.Unlock();
+ return 0;
+ }
+ int wait_for_safe() {
+ lock.Lock();
+ while (!safe)
+ cond.Wait(lock);
+ lock.Unlock();
+ return 0;
+ }
+ int is_complete() {
+ lock.Lock();
+ int r = ack;
+ lock.Unlock();
+ return r;
+ }
+ int is_safe() {
+ lock.Lock();
+ int r = safe;
+ lock.Unlock();
+ return r;
+ }
+ int get_return_value() {
+ lock.Lock();
+ int r = rval;
+ lock.Unlock();
+ return r;
+ }
+ void put() {
+ lock.Lock();
+ int n = --ref;
+ lock.Unlock();
+ if (!n)
+ delete this;
+ }
+ };
+
+ struct C_aio_Ack : public Context {
+ AioCompletion *c;
+ void finish(int r) {
+ c->lock.Lock();
+ c->rval = r;
+ c->ack = true;
+ c->cond.Signal();
+
+ if (c->buf && c->bl.length() > 0) {
+ unsigned l = MIN(c->bl.length(), c->maxlen);
+ c->bl.copy(0, l, c->buf);
+ c->rval = c->bl.length();
+ }
+ if (c->pbl) {
+ *c->pbl = c->bl;
+ }
+
+ int n = --c->ref;
+ c->lock.Unlock();
+ if (!n)
+ delete c;
+ }
+ C_aio_Ack(AioCompletion *_c) : c(_c) {}
+ };
+
+ struct C_aio_Safe : public Context {
+ AioCompletion *c;
+ void finish(int r) {
+ c->lock.Lock();
+ if (!c->ack) {
+ c->rval = r;
+ c->ack = true;
+ }
+ c->safe = true;
+ c->cond.Signal();
+ int n = --c->ref;
+ c->lock.Unlock();
+ if (!n)
+ delete c;
+ }
+ C_aio_Safe(AioCompletion *_c) : c(_c) {}
+ };
+
+ int aio_read(int pool, object_t oid, off_t off, bufferlist *pbl, size_t len,
+ AioCompletion **pc);
+ int aio_read(int pool, object_t oid, off_t off, char *buf, size_t len,
+ AioCompletion **pc);
+
+ int aio_write(int pool, object_t oid, off_t off, bufferlist& bl, size_t len,
+ AioCompletion **pc);
+
};
bool RadosClient::init()
return len;
}
+int RadosClient::aio_read(int pool, object_t oid, off_t off, bufferlist *pbl, size_t len,
+ AioCompletion **pc)
+{
+ AioCompletion *c = new AioCompletion;
+ Context *onack = new C_aio_Ack(c);
+
+ c->pbl = pbl;
+
+ ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool);
+ objecter->read(oid, layout,
+ off, len, CEPH_NOSNAP, &c->bl, 0,
+ onack);
+
+ *pc = c;
+ return 0;
+}
+int RadosClient::aio_read(int pool, object_t oid, off_t off, char *buf, size_t len,
+ AioCompletion **pc)
+{
+ AioCompletion *c = new AioCompletion;
+ Context *onack = new C_aio_Ack(c);
+
+ c->buf = buf;
+ c->maxlen = len;
+
+ ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool);
+ objecter->read(oid, layout,
+ off, len, CEPH_NOSNAP, &c->bl, 0,
+ onack);
+
+ *pc = c;
+ return 0;
+}
+
+int RadosClient::aio_write(int pool, object_t oid, off_t off, bufferlist& bl, size_t len,
+ AioCompletion **pc)
+{
+ SnapContext snapc;
+ utime_t ut = g_clock.now();
+
+ AioCompletion *c = new AioCompletion;
+ Context *onack = new C_aio_Ack(c);
+ Context *onsafe = new C_aio_Safe(c);
+
+ ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool);
+ objecter->write(oid, layout,
+ off, len, snapc, bl, ut, 0,
+ onack, onsafe);
+
+ *pc = c;
+ return 0;
+}
+
int RadosClient::remove(int pool, object_t& oid)
{
SnapContext snapc;
return ret;
}
+
+
+// -------------------------
+// aio
+
+extern "C" int rados_aio_wait_for_complete(rados_completion_t c)
+{
+ return ((RadosClient::AioCompletion *)c)->wait_for_complete();
+}
+
+extern "C" int rados_aio_wait_for_safe(rados_completion_t c)
+{
+ return ((RadosClient::AioCompletion *)c)->wait_for_safe();
+}
+
+extern "C" int rados_aio_is_complete(rados_completion_t c)
+{
+ return ((RadosClient::AioCompletion *)c)->is_complete();
+}
+
+extern "C" int rados_aio_is_safe(rados_completion_t c)
+{
+ return ((RadosClient::AioCompletion *)c)->is_safe();
+}
+
+extern "C" int rados_aio_get_return_value(rados_completion_t c)
+{
+ return ((RadosClient::AioCompletion *)c)->get_return_value();
+}
+
+extern "C" void rados_aio_free(rados_completion_t c)
+{
+ ((RadosClient::AioCompletion *)c)->put();
+}
+
+extern "C" int rados_aio_read(rados_pool_t pool, struct ceph_object *o,
+ off_t off, char *buf, size_t len,
+ rados_completion_t *completion)
+{
+ object_t oid(*o);
+ return radosp->aio_read(pool, oid, off, buf, len, (RadosClient::AioCompletion**)completion);
+}
+
+extern "C" int rados_aio_write(rados_pool_t pool, struct ceph_object *o,
+ off_t off, const char *buf, size_t len,
+ rados_completion_t *completion)
+{
+ object_t oid(*o);
+ bufferlist bl;
+ bl.append(buf, len);
+ return radosp->aio_write(pool, oid, off, bl, len, (RadosClient::AioCompletion**)completion);
+}