typedef int rados_pool_t;
int rados_open_pool(const char *name, rados_pool_t *pool);
int rados_close_pool(rados_pool_t pool);
-int rados_list(rados_pool_t pool, int max, struct ceph_object *entries, rados_list_ctx_t *ctx);
++int rados_list(rados_pool_t pool, int max, char *entries, rados_list_ctx_t *ctx);
/* read/write objects */
-int rados_write(rados_pool_t pool, struct ceph_object *oid, off_t off, const char *buf, size_t len);
-int rados_read(rados_pool_t pool, struct ceph_object *oid, off_t off, char *buf, size_t len);
-int rados_remove(rados_pool_t pool, struct ceph_object *oid);
-int rados_exec(rados_pool_t pool, struct ceph_object *o, const char *cls, const char *method,
+int rados_write(rados_pool_t pool, const char *oid, off_t off, const char *buf, size_t len);
+int rados_read(rados_pool_t pool, const char *oid, off_t off, char *buf, size_t len);
+int rados_remove(rados_pool_t pool, const char *oid);
+int rados_exec(rados_pool_t pool, const char *oid, const char *cls, const char *method,
const char *in_buf, size_t in_len, char *buf, size_t out_len);
/* async io */
int open_pool(const char *name, rados_pool_t *pool);
int close_pool(rados_pool_t pool);
- int write(rados_pool_t pool, object_t& oid, off_t off, bufferlist& bl, size_t len);
- int read(rados_pool_t pool, object_t& oid, off_t off, bufferlist& bl, size_t len);
- int remove(rados_pool_t pool, object_t& oid);
+ int write(rados_pool_t pool, const object_t& oid, off_t off, bufferlist& bl, size_t len);
+ int read(rados_pool_t pool, const object_t& oid, off_t off, bufferlist& bl, size_t len);
+ int remove(rados_pool_t pool, const object_t& oid);
- int exec(rados_pool_t pool, object_t& oid, const char *cls, const char *method,
+ int exec(rados_pool_t pool, const object_t& oid, const char *cls, const char *method,
bufferlist& inbl, bufferlist& outbl);
-
+ struct ListCtx {
+ void *ctx;
+ ListCtx() : ctx(NULL) {}
+ };
+
- int list(rados_pool_t pool, int max, vector<ceph_object>& entries, Rados::ListCtx& ctx);
++ int list(rados_pool_t pool, int max, vector<object_t>& entries, Rados::ListCtx& ctx);
+
// -- aio --
struct AioCompletion {
void *pc;
return osdmap.lookup_pg_pool_name(name);
}
- int write(int pool, object_t& oid, off_t off, bufferlist& bl, size_t len);
- int read(int pool, object_t& oid, off_t off, bufferlist& bl, size_t len);
- int remove(int pool, object_t& oid);
+ int write(int pool, const object_t& oid, off_t off, bufferlist& bl, size_t len);
+ int read(int pool, const object_t& oid, off_t off, bufferlist& bl, size_t len);
+ int remove(int pool, const object_t& oid);
- int exec(int pool, object_t& oid, const char *cls, const char *method, bufferlist& inbl, bufferlist& outbl);
+ int exec(int pool, const object_t& oid, const char *cls, const char *method, bufferlist& inbl, bufferlist& outbl);
- int list(int pool, int max_entries, vector<ceph_object>& entries, RadosClient::PGLSOp& op);
+ struct PGLSOp {
+ int seed;
+ __u64 cookie;
+
+ PGLSOp() : seed(0), cookie(0) {}
+ };
+
++ int list(int pool, int max_entries, vector<object_t>& entries, RadosClient::PGLSOp& op);
// --- aio ---
struct AioCompletion {
return true;
}
-int RadosClient::list(int pool, int max_entries, vector<ceph_object>& entries, RadosClient::PGLSOp& op)
++int RadosClient::list(int pool, int max_entries, vector<object_t>& entries, RadosClient::PGLSOp& op)
+ {
+ SnapContext snapc;
+ utime_t ut = g_clock.now();
+
+ Mutex lock("RadosClient::list");
+ Cond cond;
+ bool done;
+ int r;
+ object_t oid;
+
+ memset(&oid, 0, sizeof(oid));
+
+ ceph_object_layout layout;
+ retry:
+ int pg_num = objecter->osdmap->get_pg_num(pool);
+
+ for (;op.seed <pg_num; op.seed++) {
+ int response_size;
+ int req_size;
+
+ do {
+ int num = objecter->osdmap->get_pg_layout(pool, op.seed, layout);
+ if (num != pg_num) /* ahh.. race! */
+ goto retry;
+
+ lock.Lock();
+
+ ObjectRead rd;
+ bufferlist bl;
+ #define MAX_REQ_SIZE 1024
+ req_size = min(MAX_REQ_SIZE, max_entries);
+ rd.pg_ls(req_size, op.cookie);
+
+ Context *onack = new C_SafeCond(&lock, &cond, &done, &r);
+ objecter->read(oid, layout, rd, CEPH_NOSNAP, &bl, 0, onack);
+
+ while (!done)
+ cond.Wait(lock);
+
+ lock.Unlock();
+
+ bufferlist::iterator iter = bl.begin();
+ PGLSResponse response;
+ ::decode(response, iter);
+ op.cookie = (__u64)response.handle;
+ response_size = response.entries.size();
+ if (response_size) {
- vector<pobject_t>::iterator ls_iter;
-
- for (ls_iter = response.entries.begin(); ls_iter != response.entries.end(); ++ls_iter) {
- ceph_object obj = (ceph_object)ls_iter->oid;
- entries.push_back(obj);
- }
++ entries.swap(response.entries);
+ max_entries -= response_size;
-
+ if (!max_entries)
+ return r;
+ } else {
+ op.cookie = 0;
+ }
+ } while ((response_size == req_size) && op.cookie);
+ }
+
+ return r;
+ }
+
-int RadosClient::write(int pool, object_t& oid, off_t off, bufferlist& bl, size_t len)
+int RadosClient::write(int pool, const object_t& oid, off_t off, bufferlist& bl, size_t len)
{
SnapContext snapc;
utime_t ut = g_clock.now();
return client->init();
}
-int Rados::list(rados_pool_t pool, int max, vector<ceph_object>& entries, Rados::ListCtx& ctx)
++int Rados::list(rados_pool_t pool, int max, vector<object_t>& entries, Rados::ListCtx& ctx)
+ {
+ if (!client)
+ return -EINVAL;
+
+ RadosClient::PGLSOp *op;
+ if (!ctx.ctx) {
+ ctx.ctx = new RadosClient::PGLSOp;
+ if (!ctx.ctx)
+ return -ENOMEM;
+ }
+
+ op = (RadosClient::PGLSOp *)ctx.ctx;
+
-
+ return client->list(pool, max, entries, *op);
+ }
+
-int Rados::write(rados_pool_t pool, object_t& oid, off_t off, bufferlist& bl, size_t len)
+int Rados::write(rados_pool_t pool, const object_t& oid, off_t off, bufferlist& bl, size_t len)
{
if (!client)
return -EINVAL;
return ret;
}
-extern "C" int rados_list(rados_pool_t pool, int max, struct ceph_object *entries, rados_list_ctx_t *ctx)
++extern "C" int rados_list(rados_pool_t pool, int max, char *entries, rados_list_ctx_t *ctx)
+ {
+ int ret;
+
+ if (!*ctx) {
+ *ctx = new RadosClient::PGLSOp;
+ if (!*ctx)
+ return -ENOMEM;
+ }
+ RadosClient::PGLSOp *op = (RadosClient::PGLSOp *)*ctx;
+
- vector<ceph_object> vec;
++ vector<object_t> vec;
+ ret = radosp->list(pool, max, vec, *op);
+ if (!vec.size()) {
+ delete op;
+ *ctx = NULL;
+ }
+
- for (int i=0; i<vec.size(); i++) {
++#warning fixme
++ /*for (int i=0; i<vec.size(); i++) {
+ entries[i] = vec[i];
+ }
++ */
+
+ return ret;
+ }
+
// -------------------------
char fn[PATH_MAX];
get_cdir(c, fn);
- dout(10) << "collection_list " << fn << dendl;
- DIR *dir = ::opendir(fn);
- if (!dir)
- return -errno;
+ DIR *dir = NULL;
+ struct dirent *de;
// first, build (ino, object) list
- vector< pair<ino_t,pobject_t> > inolist;
+ vector< pair<ino_t,sobject_t> > inolist;
- struct dirent *de;
- if (handle)
- de = *(struct dirent **)handle;
+
+ dir = ::opendir(fn);
+
+ if (!dir) {
+ dout(0) << "error opening directory " << fn << dendl;
+ return -errno;
+ }
+
+ if (handle) {
+ seekdir(dir, *(off_t *)handle);
+ }
+
for (int i=0; i<max_count; i++) {
- int ret = ::readdir_r(dir, de, &de);
- if (ret) {
- dout(0) << "error reading directory" << dendl;
+ errno = 0;
+ de = ::readdir(dir);
+ if (!de && errno) {
+ dout(0) << "error reading directory " << fn << dendl;
return -errno;
}
- if (!de) {
+ if (!de)
break;
- }
+
// parse
- if (de->d_name[0] == '.') continue;
+ if (de->d_name[0] == '.') {
+ i--;
+ continue;
+ }
//cout << " got object " << de->d_name << std::endl;
-
- pobject_t o;
+ sobject_t o;
if (parse_object(de->d_name, o)) {
- inolist.push_back(pair<ino_t,pobject_t>(de->d_ino, o));
+ inolist.push_back(pair<ino_t,sobject_t>(de->d_ino, o));
ls.push_back(o);
}
}
--- /dev/null
- vector<pobject_t> entries;
+ #ifndef __PGLS_H
+ #define __PGLS_H
+
+
+ #include "include/types.h"
+ #include "os/ObjectStore.h"
+
+ struct PGLSResponse {
+ collection_list_handle_t handle;
++ vector<object_t> entries;
+
+ void encode(bufferlist& bl) const {
+ ::encode((__u64)handle, bl);
+ ::encode(entries, bl);
+ }
+ void decode(bufferlist::iterator& bl) {
+ __u64 tmp;
+ ::decode(tmp, bl);
+ handle = (collection_list_handle_t)tmp;
+ ::decode(entries, bl);
+ }
+ };
+
+ WRITE_CLASS_ENCODER(PGLSResponse)
+
+
+ #endif
for (vector<ceph_osd_op>::iterator p = op->ops.begin(); p != op->ops.end(); p++) {
switch (p->op) {
-
case CEPH_OSD_OP_PGLS:
{
- vector<sobject_t> pobjects;
- // ???
- vector<object_t> objects;
- // ???
- ::encode(objects, outdata);
+ dout(10) << " pgls pg=" << op->get_pg() << dendl;
+ // read into a buffer
+ PGLSResponse response;
+ response.handle = (collection_list_handle_t)(__u64)(p->pgls_cookie);
- result = osd->store->collection_list_partial(op->get_pg().to_coll(), response.entries, p->length, &response.handle);
++ //result = osd->store->collection_list_partial(op->get_pg().to_coll(), response.entries, p->length,
++ //&response.handle);
++#warning fixme
+ if (!result) {
+ ::encode(response, outdata);
+ }
+ dout(10) << " pgls result=" << result << " outdata.length()=" << outdata.length() << dendl;
}
break;
rd->pbl = pbl;
return read_submit(rd);
}
- tid_t read(object_t oid, ceph_object_layout ol,
+ tid_t read(const object_t& oid, ceph_object_layout ol,
ObjectRead& read, snapid_t snap, bufferlist *pbl, int flags, Context *onfinish) {
- ReadOp *rd = new ReadOp(oid, ol, read.ops, snap, flags, onfinish);
+ ReadOp *rd = new ReadOp(oid, ol, read.ops, snap, read.flags | flags, onfinish);
rd->bl = read.data;
rd->pbl = pbl;
return read_submit(rd);
cout << "read result=" << bl2.c_str() << std::endl;
cout << "size=" << size << std::endl;
- vector<ceph_object> vec;
+ Rados::ListCtx ctx;
+ int entries;
+ do {
- vector<ceph_object>::iterator iter;
++ vector<object_t> vec;
+ r = rados.list(pool, 1, vec, ctx);
+ entries = vec.size();
+ cout << "list result=" << r << " entries=" << entries << std::endl;
++ vector<object_t>::iterator iter;
+ for (iter = vec.begin(); iter != vec.end(); ++iter) {
+ cout << *iter << std::endl;
+ }
+ } while (entries);
+ #if 0
r = rados.remove(pool, oid);
cout << "remove result=" << r << std::endl;
rados.close_pool(pool);