int aio_read(pool_t pool, const std::string& oid, off_t off, bufferlist *pbl, size_t len,
AioCompletion *c);
+ int aio_sparse_read(pool_t pool, const std::string& oid, off_t off,
+ std::map<off_t,size_t> *m, bufferlist *data_bl, size_t len,
+ AioCompletion *c);
int aio_write(pool_t pool, const std::string& oid, off_t off, const bufferlist& bl, size_t len,
AioCompletion *c);
+
AioCompletion *aio_create_completion();
AioCompletion *aio_create_completion(void *cb_arg, callback_t cb_complete, callback_t cb_safe);
AioCompletion *aio_create_completion(void *cb_arg, callback_t complete_cb);
int aio_write(image_t image, off_t off, size_t len, bufferlist& bl,
AioCompletion *c);
+ int aio_read(image_t image, off_t off, size_t len, bufferlist& bl, AioCompletion *c);
/* lower level access */
void get_rados_pools(pool_t pool, librados::pool_t *md_pool, librados::pool_t *data_pool);
}
};
+ struct C_aio_sparse_read_Ack : public Context {
+ AioCompletion *c;
+ bufferlist *data_bl;
+ std::map<off_t,size_t> *m;
+
+ void finish(int r) {
+ c->lock.Lock();
+ c->rval = r;
+ c->ack = true;
+ c->cond.Signal();
+
+ bufferlist::iterator iter = c->bl.begin();
+ if (r >= 0) {
+ ::decode(*m, iter);
+ ::decode(*data_bl, iter);
+ }
+
+ if (c->callback_complete) {
+ rados_callback_t cb = c->callback_complete;
+ void *cb_arg = c->callback_arg;
+ c->lock.Unlock();
+ cb(c, cb_arg);
+ c->lock.Lock();
+ }
+
+ c->put_unlock();
+ }
+ C_aio_sparse_read_Ack(AioCompletion *_c) : c(_c) {
+ c->get();
+ }
+ };
+
struct C_aio_Safe : public Context {
AioCompletion *c;
void finish(int r) {
int aio_read(PoolCtx& pool, object_t oid, off_t off, char *buf, size_t len,
AioCompletion *c);
+ int aio_sparse_read(PoolCtx& pool, const object_t oid, off_t off,
+ std::map<off_t,size_t> *m, bufferlist *data_bl, size_t len,
+ AioCompletion *c);
+
int aio_write(PoolCtx& pool, object_t oid, off_t off, const bufferlist& bl, size_t len,
AioCompletion *c);
objecter->read(oid, oloc,
off, len, pool.snap_seq, &c->bl, 0,
onack, &c->objver);
-
return 0;
}
+
int RadosClient::aio_read(PoolCtx& pool, const object_t oid, off_t off, char *buf, size_t len,
AioCompletion *c)
{
return 0;
}
+int RadosClient::aio_sparse_read(PoolCtx& pool, const object_t oid, off_t off,
+ std::map<off_t,size_t> *m, bufferlist *data_bl, size_t len,
+ AioCompletion *c)
+{
+
+ C_aio_sparse_read_Ack *onack = new C_aio_sparse_read_Ack(c);
+ onack->m = m;
+ onack->data_bl = data_bl;
+ eversion_t ver;
+
+ c->pbl = NULL;
+
+ Mutex::Locker l(lock);
+ object_locator_t oloc(pool.poolid);
+ objecter->sparse_read(oid, oloc,
+ off, len, pool.snap_seq, &c->bl, 0,
+ onack);
+ return 0;
+}
+
int RadosClient::aio_write(PoolCtx& pool, const object_t oid, off_t off, const bufferlist& bl, size_t len,
AioCompletion *c)
{
return r;
}
+int Rados::aio_sparse_read(pool_t pool, const std::string& oid, off_t off,
+ std::map<off_t,size_t> *m, bufferlist *data_bl, size_t len,
+ AioCompletion *c)
+{
+ if (!client)
+ return -EINVAL;
+ RadosClient::PoolCtx *ctx = (RadosClient::PoolCtx *)pool;
+ RadosClient::AioCompletion *pc = (RadosClient::AioCompletion *)c->pc;
+ int r = client->aio_sparse_read(*ctx, oid, off, m, data_bl, len, pc);
+ return r;
+}
+
int Rados::aio_write(rados_pool_t pool, const string& oid, off_t off, const bufferlist& bl, size_t len,
AioCompletion *c)
{
struct AioCompletion *completion;
off_t ofs;
size_t len;
- AioBlockCompletion(AioCompletion *aio_completion, off_t _ofs, size_t _len) :
- completion(aio_completion), ofs(_ofs), len(_len) {}
+ char *buf;
+ map<off_t, size_t> m;
+ bufferlist data_bl;
+
+ AioBlockCompletion(AioCompletion *aio_completion, off_t _ofs, size_t _len, char *_buf) :
+ completion(aio_completion), ofs(_ofs), len(_len), buf(_buf) {}
void complete(int r);
};
+
+
struct AioCompletion {
Mutex lock;
Cond cond;
int ref;
bool released;
- static void rados_cb(rados_completion_t cb, void *arg);
-
- AioCompletion() : lock("RBDClient::AioCompletion::lock"), done(false), rval(0), pending_count(0),
- ref(1), released(false) {}
+ AioCompletion() : lock("RBDClient::AioCompletion::lock"), done(false), rval(0), complete_cb(NULL), complete_arg(NULL), pending_count(0),
+ ref(1), released(false) { cout << "AioCompletion::AioCompletion() this=" << (void *)this << std::endl; }
~AioCompletion() { cout << "AioCompletion::~AioCompletion()" << std::endl; }
int wait_for_complete() {
lock.Lock();
}
};
static void rados_cb(rados_completion_t cb, void *arg);
+ static void rados_aio_sparse_read_cb(rados_completion_t cb, void *arg);
int initialize(int argc, const char *argv[]);
void shutdown();
int write(PoolCtx *ctx, ImageCtx *ictx, off_t off, size_t len, const char *buf);
int aio_write(PoolCtx *pool, ImageCtx *ictx, off_t off, size_t len, const char *buf,
AioCompletion *c);
+ int aio_read(PoolCtx *ctx, ImageCtx *ictx, off_t off, size_t len,
+ char *buf, AioCompletion *c);
AioCompletion *aio_create_completion() {
return new AioCompletion;
}
AioCompletion *aio_create_completion(void *cb_arg, callback_t cb_complete) {
AioCompletion *c = new AioCompletion;
- if (cb_complete)
- c->set_complete_cb(cb_arg, cb_complete);
+ c->set_complete_cb(cb_arg, cb_complete);
return c;
}
};
if (r < 0)
return r;
bl_ofs += extent_len;
- buf_bl_pos = extent_len;
+ buf_bl_pos += extent_len;
}
/* last hole */
void librbd::RBDClient::AioBlockCompletion::complete(int r)
{
cout << "AioBlockCompletion::complete()" << std::endl;
+ if ((r >= 0 || r == -ENOENT) && buf) { // this was a sparse_read operation
+ map<off_t, size_t>::iterator iter;
+ off_t bl_ofs = 0, buf_bl_pos = 0;
+ cout << "ofs=" << ofs << " len=" << len << std::endl;
+ for (iter = m.begin(); iter != m.end(); ++iter) {
+ off_t extent_ofs = iter->first;
+ size_t extent_len = iter->second;
+
+ cout << "extent_ofs=" << extent_ofs << " extent_len=" << extent_len << std::endl;
+
+ /* a hole? */
+ if (extent_ofs - ofs) {
+ cout << "<1>zeroing " << buf_bl_pos << "~" << extent_ofs << std::endl;
+ cout << "buf=" << (void *)(buf + buf_bl_pos) << "~" << (void *)(buf + len - buf_bl_pos -1) << std::endl;
+ memset(buf + buf_bl_pos, 0, extent_ofs - ofs);
+ }
+
+ if (bl_ofs + extent_len > len) {
+ r = -EIO;
+ break;
+ }
+ buf_bl_pos += extent_ofs - ofs;
+
+ /* data */
+ memcpy(buf + buf_bl_pos, data_bl.c_str() + bl_ofs, extent_len);
+ cout << "copying " << buf_bl_pos << "~" << extent_len << " from ofs=" << bl_ofs << std::endl;
+ bl_ofs += extent_len;
+ buf_bl_pos += extent_len;
+ }
+
+ /* last hole */
+ if (len - buf_bl_pos) {
+ cout << "<2>zeroing " << buf_bl_pos << "~" << len - buf_bl_pos << std::endl;
+ cout << "buf=" << (void *)(buf + buf_bl_pos) << "~" << (void *)(buf + len - buf_bl_pos -1) << std::endl;
+ memset(buf + buf_bl_pos, 0, len - buf_bl_pos);
+ }
+
+ r = len;
+ }
completion->complete_block(this, r);
}
void librbd::RBDClient::AioCompletion::complete_block(AioBlockCompletion *block_completion, int r)
{
- cout << "RBDClient::AioCompletion::complete_block this=" << (void *)this << std::endl;
+ cout << "RBDClient::AioCompletion::complete_block this=" << (void *)this << " complete_cb=" << (void *)complete_cb << std::endl;
lock.Lock();
- if (r < 0 && r != -EEXIST && !rval)
- rval = r;
+ if (rval >= 0) {
+ if (r < 0 && r != -EEXIST)
+ rval = r;
+ else if (r > 0)
+ rval += r;
+ }
assert(pending_count);
int count = --pending_count;
if (!count) {
- complete_cb(this, complete_arg);
+ if (complete_cb)
+ complete_cb(this, complete_arg);
done = true;
cond.Signal();
}
void librbd::RBDClient::rados_cb(rados_completion_t c, void *arg)
{
+ cout << "librbd::RBDClient::rados_cb" << std::endl;
AioBlockCompletion *block_completion = (AioBlockCompletion *)arg;
block_completion->complete(rados_aio_get_return_value(c));
}
uint64_t block_ofs = get_block_ofs(&ictx->header, off + total_write);
uint64_t write_len = min(block_size - block_ofs, left);
bl.append(buf + total_write, write_len);
- AioBlockCompletion *block_completion = new AioBlockCompletion(c, off, len);
+ AioBlockCompletion *block_completion = new AioBlockCompletion(c, off, len, NULL);
c->add_block_completion(block_completion);
librados::Rados::AioCompletion *rados_completion = rados.aio_create_completion(block_completion, NULL, rados_cb);
r = rados.aio_write(pool->data, oid, block_ofs, bl, write_len, rados_completion);
return r;
}
+void librbd::RBDClient::rados_aio_sparse_read_cb(rados_completion_t c, void *arg)
+{
+ cout << "librbd::RBDClient::rados_aio_sparse_read_cb" << std::endl;
+ AioBlockCompletion *block_completion = (AioBlockCompletion *)arg;
+ block_completion->complete(rados_aio_get_return_value(c));
+}
+
+int librbd::RBDClient::aio_read(PoolCtx *ctx, ImageCtx *ictx, off_t off, size_t len,
+ char *buf,
+ AioCompletion *c)
+{
+ int64_t ret;
+ int r, total_read = 0;
+ uint64_t start_block = get_block_num(&ictx->header, off);
+ uint64_t end_block = get_block_num(&ictx->header, off + len);
+ uint64_t block_size = get_block_size(&ictx->header);
+ uint64_t left = len;
+
+ for (uint64_t i = start_block; i <= end_block; i++) {
+ bufferlist bl;
+ string oid = get_block_oid(&ictx->header, i);
+ uint64_t block_ofs = get_block_ofs(&ictx->header, off + total_read);
+ uint64_t read_len = min(block_size - block_ofs, left);
+
+ map<off_t, size_t> m;
+ map<off_t, size_t>::iterator iter;
+ off_t bl_ofs = 0, buf_bl_pos = 0;
+
+ AioBlockCompletion *block_completion = new AioBlockCompletion(c, block_ofs, read_len, buf + total_read);
+ c->add_block_completion(block_completion);
+
+ librados::Rados::AioCompletion *rados_completion = rados.aio_create_completion(block_completion, rados_aio_sparse_read_cb, rados_cb);
+ r = rados.aio_sparse_read(ctx->data, oid, block_ofs,
+ &block_completion->m, &block_completion->data_bl,
+ read_len, rados_completion);
+ if (r < 0 && r == -ENOENT)
+ r = 0;
+ if (r < 0) {
+ ret = r;
+ goto done;
+ }
+ total_read += read_len;
+ }
+ ret = total_read;
+done:
+ return ret;
+}
+
/*
RBD
*/
return client->aio_write(ictx->pctx, ictx, off, len, bl.c_str(), (RBDClient::AioCompletion *)c->pc);
}
+int librbd::RBD::aio_read(image_t image, off_t off, size_t len, bufferlist& bl,
+ AioCompletion *c)
+{
+ ImageCtx *ictx = (ImageCtx *)image;
+ bufferptr ptr(len);
+ bl.push_back(ptr);
+ cout << "librbd::RBD::aio_read() buf=" << (void *)bl.c_str() << "~" << (void *)(bl.c_str() + len - 1) << std::endl;
+ return client->aio_read(ictx->pctx, ictx, off, len, bl.c_str(), (RBDClient::AioCompletion *)c->pc);
+}
+
int librbd::RBD::AioCompletion::wait_for_complete()
{
RBDClient::AioCompletion *c = (RBDClient::AioCompletion *)pc;
<< std::endl;
}
-/*
-int read_rbd_info(librbd::pools_t& pp, string& info_oid, struct rbd_info *info)
-{
- int r;
- bufferlist bl;
-
- r = rados.read(pp.md, info_oid, 0, bl, sizeof(*info));
- if (r < 0)
- return r;
- if (r == 0) {
- return init_rbd_info(info);
- }
-
- if (r < (int)sizeof(*info))
- return -EIO;
-
- memcpy(info, bl.c_str(), r);
- return 0;
-}
-
-static int touch_rbd_info(librados::pool_t pool, string& info_oid)
-{
- bufferlist bl;
- int r = rados.write(pool, info_oid, 0, bl, 0);
- if (r < 0)
- return r;
- return 0;
-}
-
-static int rbd_assign_bid(librados::pool_t pool, string& info_oid, uint64_t *id)
-{
- bufferlist bl, out;
-
- *id = 0;
-
- int r = touch_rbd_info(pool, info_oid);
- if (r < 0)
- return r;
-
- r = rados.exec(pool, info_oid, "rbd", "assign_bid", bl, out);
- if (r < 0)
- return r;
-
- bufferlist::iterator iter = out.begin();
- ::decode(*id, iter);
-
- return 0;
-}
-
-
-static int read_header_bl(librados::pool_t pool, string& md_oid, bufferlist& header, uint64_t *ver)
-{
- int r;
-#define READ_SIZE 4096
- do {
- bufferlist bl;
- r = rados.read(pool, md_oid, 0, bl, READ_SIZE);
- if (r < 0)
- return r;
- header.claim_append(bl);
- } while (r == READ_SIZE);
-
- if (ver)
- *ver = rados.get_last_version(pool);
-
- return 0;
-}
-
-static int notify_change(librados::pool_t pool, string& oid, uint64_t *pver)
-{
- uint64_t ver;
- if (pver)
- ver = *pver;
- else
- ver = rados.get_last_version(pool);
- rados.notify(pool, oid, ver);
- return 0;
-}
-
-static int read_header(librados::pool_t pool, string& md_oid, struct rbd_obj_header_ondisk *header, uint64_t *ver)
-{
- bufferlist header_bl;
- int r = read_header_bl(pool, md_oid, header_bl, ver);
- if (r < 0)
- return r;
- if (header_bl.length() < (int)sizeof(*header))
- return -EIO;
- memcpy(header, header_bl.c_str(), sizeof(*header));
-
- return 0;
-}
-
-static int write_header(librbd::pools_t& pp, string& md_oid, bufferlist& header)
-{
- bufferlist bl;
- int r = rados.write(pp.md, md_oid, 0, header, header.length());
-
- notify_change(pp.md, md_oid, NULL);
-
- return r;
-}
-
-static int tmap_set(librbd::pools_t& pp, string& imgname)
-{
- bufferlist cmdbl, emptybl;
- __u8 c = CEPH_OSD_TMAP_SET;
- ::encode(c, cmdbl);
- ::encode(imgname, cmdbl);
- ::encode(emptybl, cmdbl);
- return rados.tmap_update(pp.md, dir_oid, cmdbl);
-}
-
-static int tmap_rm(librbd::pools_t& pp, string& imgname)
-{
- bufferlist cmdbl;
- __u8 c = CEPH_OSD_TMAP_RM;
- ::encode(c, cmdbl);
- ::encode(imgname, cmdbl);
- return rados.tmap_update(pp.md, dir_oid, cmdbl);
-}
-
-static int rollback_image(librbd::pools_t& pp, struct rbd_obj_header_ondisk *header,
- ::SnapContext& snapc, uint64_t snapid)
-{
- uint64_t numseg = get_max_block(header);
-
- for (uint64_t i = 0; i < numseg; i++) {
- int r;
- string oid = get_block_oid(header, i);
- librados::SnapContext sn;
- sn.seq = snapc.seq;
- sn.snaps.clear();
- vector<snapid_t>::iterator iter = snapc.snaps.begin();
- for (; iter != snapc.snaps.end(); ++iter) {
- sn.snaps.push_back(*iter);
- }
- r = rados.selfmanaged_snap_rollback_object(pp.data, oid, sn, snapid);
- if (r < 0 && r != -ENOENT)
- return r;
- }
- return 0;
-}
-*/
static int do_list(librbd::pool_t pool)
{
std::vector<string> names;
{
int r;
librbd::image_info_t info;
+ bufferlist bl;
int fd = open(path, O_WRONLY | O_CREAT | O_EXCL, 0644);
if (fd < 0)
return -errno;
r = rbd.stat(image, info);
if (r < 0)
return r;
-
+#if 0
r = rbd.read_iterate(image, 0, info.size, export_read_cb, (void *)&fd);
if (r < 0)
return r;
+#endif
+ librbd::RBD::AioCompletion *completion = rbd.aio_create_completion(NULL, NULL);
+ if (!completion) {
+ r = -ENOMEM;
+ goto done;
+ }
+ r = rbd.aio_read(image, 0, info.size, bl, completion);
+ completion->wait_for_complete();
+ r = completion->get_return_value();
+ completion->release();
+ if (r < 0) {
+ cerr << "error writing to image block" << std::endl;
+ goto done;
+ }
+ r = write(fd, bl.c_str(), bl.length());
+ if (r < 0)
+ return r;
r = ftruncate(fd, info.size);
if (r < 0)
return r;
-
+done:
close(fd);
return 0;
if (r < 0)
goto done;
completion->wait_for_complete();
+ r = completion->get_return_value();
completion->release();
if (r < 0) {
cerr << "error writing to image block" << std::endl;