int rados_initialize(int argc, const char **argv); /* arguments are optional */
void rados_deinitialize();
+typedef void *rados_list_ctx_t;
+
/* pools */
typedef int rados_pool_t;
int rados_open_pool(const char *name, rados_pool_t *pool);
int rados_close_pool(rados_pool_t pool);
-int rados_list(rados_pool_t pool);
+int rados_list(rados_pool_t pool, int max, struct ceph_object *entries, rados_list_ctx_t *ctx);
/* read/write objects */
int rados_write(rados_pool_t pool, struct ceph_object *oid, off_t off, const char *buf, size_t len);
int open_pool(const char *name, rados_pool_t *pool);
int close_pool(rados_pool_t pool);
- int list(rados_pool_t pool, vector<string>& entries);
int write(rados_pool_t pool, object_t& oid, off_t off, bufferlist& bl, size_t len);
int read(rados_pool_t pool, object_t& oid, off_t off, bufferlist& bl, size_t len);
int exec(rados_pool_t pool, object_t& oid, const char *cls, const char *method,
bufferlist& inbl, bufferlist& outbl);
-
+ struct ListCtx {
+ void *ctx;
+ ListCtx() : ctx(NULL) {}
+ };
+
+ int list(rados_pool_t pool, int max, vector<ceph_object>& entries, Rados::ListCtx& ctx);
+
// -- aio --
struct AioCompletion {
void *pc;
__u8 argc;
__le32 indata_len;
} __attribute__ ((packed));
+ struct {
+ __le64 pgls_cookie, count;
+ };
};
} __attribute__ ((packed));
#include "mon/MonMap.h"
#include "mds/MDS.h"
#include "osd/OSDMap.h"
+#include "osd/PGLS.h"
#include "msg/SimpleMessenger.h"
return osdmap.lookup_pg_pool_name(name);
}
- int list(int pool, vector<string>& entries);
-
int write(int pool, object_t& oid, off_t off, bufferlist& bl, size_t len);
int read(int pool, object_t& oid, off_t off, bufferlist& bl, size_t len);
int remove(int pool, object_t& oid);
int exec(int pool, object_t& oid, const char *cls, const char *method, bufferlist& inbl, bufferlist& outbl);
+ struct PGLSOp {
+ int seed;
+ __u64 cookie;
+
+ PGLSOp() : seed(0), cookie(0) {}
+ };
+
+ int list(int pool, int max_entries, vector<ceph_object>& entries, RadosClient::PGLSOp& op);
// --- aio ---
struct AioCompletion {
return true;
}
-int RadosClient::list(int pool, vector<string>& entries)
+int RadosClient::list(int pool, int max_entries, vector<ceph_object>& entries, RadosClient::PGLSOp& op)
{
SnapContext snapc;
utime_t ut = g_clock.now();
memset(&oid, 0, sizeof(oid));
- dout(0) << hex << "pool=" << dec << pool << dendl;
+ dout(0) << hex << "pool=" << dec << pool << " op.cookie=" << op.cookie << dendl;
ceph_object_layout layout;
retry:
int pg_num = objecter->osdmap->get_pg_num(pool);
- for (int i=0; i<pg_num; i++) {
- int num = objecter->osdmap->get_pg_layout(pool, i, layout);
- if (num != pg_num) /* ahh.. race! */
- goto retry;
+ for (;op.seed <pg_num; op.seed++) {
+ int response_size;
+ int req_size;
- lock.Lock();
+ do {
+ int num = objecter->osdmap->get_pg_layout(pool, op.seed, layout);
+ if (num != pg_num) /* ahh.. race! */
+ goto retry;
- ObjectRead rd;
- bufferlist bl;
- rd.pg_ls(0, 1024);
- Context *onack = new C_SafeCond(&lock, &cond, &done, &r);
- objecter->read(oid, layout, rd, CEPH_NOSNAP, &bl, 0, onack);
+ lock.Lock();
- while (!done)
- cond.Wait(lock);
+ ObjectRead rd;
+ bufferlist bl;
+#define MAX_REQ_SIZE 1024
+ req_size = min(MAX_REQ_SIZE, max_entries);
+ rd.pg_ls(req_size, op.cookie);
- lock.Unlock();
- dout(0) << "after pg_ls(" << i << ") r=" << r << " got " << bl.length() << " bytes ol_pgls=" << hex << layout.ol_pgid << dec << dendl;
+ Context *onack = new C_SafeCond(&lock, &cond, &done, &r);
+ objecter->read(oid, layout, rd, CEPH_NOSNAP, &bl, 0, onack);
- vector<pobject_t> ls;
- bufferlist::iterator iter = bl.begin();
- ::decode(ls, iter);
- if (ls.size() > 0) {
- vector<pobject_t>::iterator ls_iter;
+ while (!done)
+ cond.Wait(lock);
- for (ls_iter = ls.begin(); ls_iter != ls.end(); ++ls_iter) {
- dout(0) << "entry: " << *ls_iter << dendl;
+ lock.Unlock();
+ dout(0) << "after pg_ls(" << op.seed << ") r=" << r << " got " << bl.length() << " bytes ol_pgls=" << hex << layout.ol_pgid << dec << dendl;
+ dout(0) << "max_entries=" << max_entries << dendl;
+
+ bufferlist::iterator iter = bl.begin();
+ PGLSResponse response;
+ ::decode(response, iter);
+ op.cookie = (__u64)response.handle;
+ response_size = response.entries.size();
+ if (response_size) {
+ vector<pobject_t>::iterator ls_iter;
+
+ for (ls_iter = response.entries.begin(); ls_iter != response.entries.end(); ++ls_iter) {
+ dout(0) << "entry: " << *ls_iter << dendl;
+ ceph_object obj = (ceph_object)ls_iter->oid;
+ entries.push_back(obj);
+ }
+ max_entries -= response_size;
+ dout(0) << "op.cookie=" << op.cookie << dendl;
+
+ if (!max_entries)
+ return r;
+ } else {
+ op.cookie = 0;
}
- }
+ } while ((response_size == req_size) && op.cookie);
}
return r;
return client->init();
}
-int Rados::list(rados_pool_t pool, vector<string>& entries)
+int Rados::list(rados_pool_t pool, int max, vector<ceph_object>& entries, Rados::ListCtx& ctx)
{
if (!client)
return -EINVAL;
- return client->list(pool, entries);
+ RadosClient::PGLSOp *op;
+ if (!ctx.ctx) {
+ ctx.ctx = new RadosClient::PGLSOp;
+ if (!ctx.ctx)
+ return -ENOMEM;
+ }
+
+ op = (RadosClient::PGLSOp *)ctx.ctx;
+
+
+ return client->list(pool, max, entries, *op);
}
int Rados::write(rados_pool_t pool, object_t& oid, off_t off, bufferlist& bl, size_t len)
return ret;
}
-extern "C" int rados_list(rados_pool_t pool)
+extern "C" int rados_list(rados_pool_t pool, int max, struct ceph_object *entries, rados_list_ctx_t *ctx)
{
int ret;
- vector<string> entries;
- ret = radosp->list(pool, entries);
+
+ if (!*ctx) {
+ *ctx = new RadosClient::PGLSOp;
+ if (!*ctx)
+ return -ENOMEM;
+ }
+ RadosClient::PGLSOp *op = (RadosClient::PGLSOp *)*ctx;
+
+ vector<ceph_object> vec;
+ ret = radosp->list(pool, max, vec, *op);
+ if (!vec.size()) {
+ delete op;
+ *ctx = NULL;
+ }
+
+ for (int i=0; i<vec.size(); i++) {
+ entries[i] = vec[i];
+ }
return ret;
}
// first, build (ino, object) list
vector< pair<ino_t,pobject_t> > inolist;
- if (handle)
- dir = *(DIR **)handle;
- if (!dir)
- dir = ::opendir(fn);
+ dir = ::opendir(fn);
if (!dir) {
dout(0) << "error opening directory " << fn << dendl;
return -errno;
}
+ if (handle) {
+ dout(0) << "seeking to position " << *(off_t *)handle << dendl;
+ seekdir(dir, *(off_t *)handle);
+ }
+
for (int i=0; i<max_count; i++) {
errno = 0;
de = ::readdir(dir);
break;
// parse
- if (de->d_name[0] == '.') continue;
+ if (de->d_name[0] == '.') {
+ i--;
+ continue;
+ }
//cout << " got object " << de->d_name << std::endl;
+
+ dout(0) << "readdir: " << de->d_name << dendl;
pobject_t o;
if (parse_object(de->d_name, o)) {
inolist.push_back(pair<ino_t,pobject_t>(de->d_ino, o));
}
}
- if (!handle || !de)
- ::closedir(dir);
+ if (handle) {
+ *handle = (collection_list_handle_t)telldir(dir);
+ dout(0) << "returning handle=" << (__u64)*handle << dendl;
+ }
- if (handle)
- *handle = (collection_list_handle_t)dir;
+ ::closedir(dir);
// build final list
ls.resize(inolist.size());
# define MIN(a,b) ((a) < (b) ? (a):(b))
#endif
-typedef void *collection_list_handle_t;
+typedef __u64 collection_list_handle_t;
/*
* low-level interface to the local OSD file system
--- /dev/null
+#ifndef __PGLS_H
+#define __PGLS_H
+
+
+#include "include/types.h"
+#include "os/ObjectStore.h"
+
+struct PGLSResponse {
+ collection_list_handle_t handle;
+ vector<pobject_t> entries;
+
+ void encode(bufferlist& bl) const {
+ ::encode((__u64)handle, bl);
+ ::encode(entries, bl);
+ }
+ void decode(bufferlist::iterator& bl) {
+ __u64 tmp;
+ ::decode(tmp, bl);
+ handle = (collection_list_handle_t)tmp;
+ ::decode(entries, bl);
+ }
+};
+
+WRITE_CLASS_ENCODER(PGLSResponse)
+
+
+#endif
#include "ReplicatedPG.h"
#include "OSD.h"
+#include "PGLS.h"
#include "common/arch.h"
#include "common/Logger.h"
{
dout(10) << " pgls pg=" << op->get_pg() << dendl;
// read into a buffer
- vector<pobject_t> vec;
- collection_list_handle_t handle = NULL;
- result = osd->store->collection_list_partial(op->get_pg().to_coll(), vec, p->length, &handle);
+ PGLSResponse response;
+ response.handle = (collection_list_handle_t)(__u64)(p->pgls_cookie);
+ result = osd->store->collection_list_partial(op->get_pg().to_coll(), response.entries, p->length, &response.handle);
if (!result) {
-#if 0
- ctx->data_off = op.offset;
-#endif
- ::encode(vec, outdata);
+ ::encode(response, outdata);
}
dout(10) << " pgls result=" << result << " outdata.length()=" << outdata.length() << dendl;
}
}
break;
- case CEPH_OSD_OP_PGLS:
- {
- // read into a buffer
- vector<pobject_t> vec;
- collection_list_handle_t handle = NULL;
- int r = osd->store->collection_list_partial(info.pgid.to_coll(), vec, op.length, &handle);
- if (!r) {
- ctx->data_off = op.offset;
- ::encode(vec, odata);
- } else {
- result = r;
- }
- dout(10) << " read got " << r << " / " << op.length << " bytes from obj " << soid << dendl;
- }
- break;
-
case CEPH_OSD_OP_RDCALL:
{
string cname, mname;
data.append(method, ops[s].method_len);
data.append(indata);
}
+ void add_pgls(int op, __u64 count, __u64 cookie) {
+ int s = ops.size();
+ ops.resize(s+1);
+ memset(&ops[s], 0, sizeof(ops[s]));
+ ops[s].op = op;
+ ops[s].count = count;
+ ops[s].pgls_cookie = cookie;
+ }
ObjectOperation() : flags(0) {}
};
add_call(CEPH_OSD_OP_RDCALL, cname, method, indata);
}
- void pg_ls(__u64 off, __u64 len) {
- add_data(CEPH_OSD_OP_PGLS, off, len);
+ void pg_ls(__u64 count, __u64 cookie) {
+ add_pgls(CEPH_OSD_OP_PGLS, count, cookie);
flags |= CEPH_OSD_FLAG_PGOP;
}
};
object_t oid;
memset(&oid, 0, sizeof(oid));
- oid.ino = 0x2010;
+ oid.ino = 0x2020;
rados_pool_t pool;
int r = rados.open_pool("data", &pool);
cout << "read result=" << bl2.c_str() << std::endl;
cout << "size=" << size << std::endl;
- vector<string> vec;
- r = rados.list(pool, vec);
- cout << "read result=" << r << " pool=" << hex << pool << dec << std::endl;
+ Rados::ListCtx ctx;
+ int entries;
+ do {
+ vector<ceph_object> vec;
+ r = rados.list(pool, 1, vec, ctx);
+ entries = vec.size();
+ cout << "list result=" << r << " entries=" << entries << std::endl;
+ vector<ceph_object>::iterator iter;
+ for (iter = vec.begin(); iter != vec.end(); ++iter) {
+ cout << *iter << std::endl;
+ }
+ } while (entries);
#if 0
r = rados.remove(pool, oid);
cout << "remove result=" << r << std::endl;