int exec(PoolCtx& pool, const object_t& oid, const char *cls, const char *method, bufferlist& inbl, bufferlist& outbl);
- struct PGLSOp {
- int seed;
- __u64 cookie;
- std::list<object_t> list;
- std::list<object_t>::iterator iter;
- __u64 pos;
- __u64 total;
-
- PGLSOp() : seed(0), cookie(0), pos(0), total(0) {}
- };
int list_pools(std::vector<string>& ls);
int get_pool_stats(std::vector<string>& ls, map<string,rados_pool_stat_t>& result);
int get_fs_stats( rados_statfs_t& result );
- int list(PoolCtx& pool, int max_entries, std::list<object_t>& entries, RadosClient::PGLSOp& op);
+ int list(PoolCtx& pool, int max_entries, std::list<object_t>& entries,
+ Objecter::ListContext *context);
// --- aio ---
struct AioCompletion {
// IO
-int RadosClient::list(PoolCtx& pool, int max_entries, std::list<object_t>& entries, RadosClient::PGLSOp& op)
-{
- utime_t ut = g_clock.now();
-
+int RadosClient::list(PoolCtx& pool, int max_entries, std::list<object_t>& entries, Objecter::ListContext *context) {
Cond cond;
bool done;
int r = 0;
object_t oid;
+ Mutex mylock("RadosClient::list::mylock");
- memset(&oid, 0, sizeof(oid));
- entries.clear();
-
- ceph_object_layout layout;
-retry:
- int pg_num = objecter->osdmap->get_pg_num(pool.poolid);
-
- for (;op.seed <pg_num; op.seed++) {
- int response_size;
- int req_size;
-
- do {
- lock.Lock();
- int num = objecter->osdmap->get_pg_layout(pool.poolid, op.seed, layout);
- lock.Unlock();
- if (num != pg_num) /* ahh.. race! */
- goto retry;
-
- ObjectOperation rd;
- bufferlist bl;
-#define MAX_REQ_SIZE 1024
- req_size = min(MAX_REQ_SIZE, max_entries);
- rd.pg_ls(req_size, op.cookie);
-
- Mutex mylock("RadosClient::list::mylock");
- Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
+ lock.Lock();
+ objecter->list_objects(pool.poolid, pool.snap_seq, max_entries, entries, context,
+ new C_SafeCond(&mylock, &cond, &done, &r));
+ lock.Unlock();
- lock.Lock();
- objecter->read(oid, layout, rd, pool.snap_seq, &bl, 0, onack);
- lock.Unlock();
+ mylock.Lock();
+ while(!done)
+ cond.Wait(mylock);
+ mylock.Unlock();
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
-
- bufferlist::iterator iter = bl.begin();
- PGLSResponse response;
- ::decode(response, iter);
- op.cookie = (__u64)response.handle;
- response_size = response.entries.size();
- if (response_size) {
- entries.merge(response.entries);
- max_entries -= response_size;
- if (!max_entries)
- return r;
- } else {
- op.cookie = 0;
- }
- } while ((response_size == req_size) && op.cookie);
- }
-
return r;
}
if (!client)
return -EINVAL;
- RadosClient::PGLSOp *op;
+ Objecter::ListContext *op;
if (!ctx.ctx) {
- ctx.ctx = new RadosClient::PGLSOp;
+ ctx.ctx = new Objecter::ListContext();
if (!ctx.ctx)
return -ENOMEM;
}
- op = (RadosClient::PGLSOp *)ctx.ctx;
-
- return client->list(*(RadosClient::PoolCtx *)pool, max, entries, *op);
+ op = (Objecter::ListContext *) ctx.ctx;
+ return client->list(*(RadosClient::PoolCtx *)pool, max, entries, op);
}
int Rados::write(rados_pool_t pool, const object_t& oid, off_t off, bufferlist& bl, size_t len)
extern "C" void rados_pool_close_ctx(rados_list_ctx_t *ctx)
{
if (*ctx) {
- RadosClient::PGLSOp *op = (RadosClient::PGLSOp *)*ctx;
+ Objecter::ListContext *op = (Objecter::ListContext *)*ctx;
delete op;
*ctx = NULL;
}
RadosClient::PoolCtx *ctx = (RadosClient::PoolCtx *)pool;
if (!*listctx) {
- *listctx = new RadosClient::PGLSOp;
+ *listctx = new Objecter::ListContext;
if (!*listctx)
return -ENOMEM;
}
- RadosClient::PGLSOp *op = (RadosClient::PGLSOp *)*listctx;
+ Objecter::ListContext *op = (Objecter::ListContext *)*listctx;
if (op->pos == op->total) {
op->list.clear();
#define MAX_ENTRIES 1024
- ret = radosp->list(*ctx, MAX_ENTRIES, op->list, *op);
+ ret = radosp->list(*ctx, MAX_ENTRIES, op->list, op);
if (!op->list.size()) {
delete op;
*listctx = NULL;
#include "Objecter.h"
#include "osd/OSDMap.h"
+#include "osd/PGLS.h"
#include "mon/MonMap.h"
#include "msg/Messenger.h"
#include "config.h"
+#define MAX_REQ_SIZE 1024
#define DOUT_SUBSYS objecter
#undef dout_prefix
#define dout_prefix *_dout << dbeginl << messenger->get_myname() << ".objecter "
void Objecter::handle_osd_op_reply(MOSDOpReply *m)
{
+ dout(10) << "in handle_osd_op_reply" << dendl;
// get pio
tid_t tid = m->get_tid();
delete m;
}
+
+void Objecter::list_objects(int pool_id, int pool_snap_seq, int max_entries,
+ std::list<object_t>& entries, ListContext * list_context, Context *onfinish) {
+
+ dout(10) << "list_objects" << dendl;
+ dout(20) << "pool_id " << pool_id
+ << "\npool_snap_seq " << pool_snap_seq
+ << "\nmax_entries " << max_entries
+ << "\n&entries " << &entries
+ << "\nlist_context " << list_context
+ << "\nonfinish " << onfinish
+ << "\nlist_context->seed" << list_context->seed
+ << "\nlist_context->cookie" << list_context->cookie << dendl;
+
+ if (!list_context->in_loop)
+ entries.clear();
+
+ ceph_object_layout layout;
+ int request_size;
+
+ object_t oid;
+ memset(&oid, 0, sizeof(oid)); //the read we perform later doesn't refer to an object,
+ //so make sure nothing can think it does
+
+ int pg_num = osdmap->get_pg_layout(pool_id, list_context->seed, layout);
+ if (list_context->starting_pg_num == 0) {// there can't be zero pgs!
+ list_context->starting_pg_num = pg_num;
+ dout(20) << pg_num << " placement groups" << dendl;
+ }
+ if (list_context->starting_pg_num != pg_num) { //start reading from the beginning; the pgs have changed
+ dout(10) << "The placement groups have changed, restarting with " << pg_num << dendl;
+ list_context->seed = 0;
+ list_context->cookie = 0;
+ list_context->starting_pg_num = pg_num;
+ osdmap->get_pg_layout(pool_id, list_context->seed, layout);
+ }
+ if(list_context->seed == pg_num){ //this context got all the way through
+ onfinish->finish(0);
+ delete onfinish;
+ }
+
+ ObjectOperation op;
+ request_size = min(MAX_REQ_SIZE, max_entries);
+ op.pg_ls(request_size, list_context->cookie);
+
+ C_List * onack = new C_List(pool_id, pool_snap_seq, max_entries, request_size,
+ &entries, list_context, onfinish, this);
+ read(oid, layout, op, pool_snap_seq, &onack->bl, 0,(Context *)onack);
+}
+
+void Objecter::_list_reply(void *cxt)
+{
+ dout(10) << "_list_reply" << dendl;
+ C_List * context = (C_List *) cxt;
+ bufferlist::iterator iter = context->bl.begin();
+ PGLSResponse response;
+ ::decode(response, iter);
+ context->list_context->cookie = (__u64)response.handle;
+ int response_size = response.entries.size();
+ dout(20) << "response.entries.size " << response_size
+ <<"\nresponse.entries " << response.entries << dendl;
+ if (response_size) {
+ dout(20) << "got a response with objects, proceeding" << dendl;
+ context->entries->merge(response.entries);
+ context->max_entries -= response_size;
+ if (!context->max_entries) { //yay, we're done!
+ dout(20) << "reached requested number of objects, cleaning up and exiting" << dendl;
+ context->final_finish->finish(0);
+ delete context->final_finish;
+ context->list_context->in_loop = false;
+ context->list_context = NULL;
+ context->entries = NULL;
+ context->objecter = NULL;
+ // delete context;
+ return;
+ }
+ if ((response_size == context->request_size) && context->list_context->cookie) {
+ //there are probably more objects in this pg
+ dout(20) << "expect more objects in current pg, proceeding" << dendl;
+ context->list_context->in_loop = true;
+ list_objects(context->pool_id, context->pool_snap_seq, context->max_entries,
+ *context->entries, context->list_context, context->final_finish);
+ context->final_finish = NULL;
+ context->list_context = NULL;
+ context->entries = NULL;
+ context->objecter = NULL;
+ // delete context;
+ return;
+ }
+ }
+ //if we make this this far, there are no more objects in the current pg. Move on!
+ dout(20) << "emptied current pg, moving on to next one" << dendl;
+ ++context->list_context->seed;
+ dout(20) << "new seed value: " << context->list_context->seed <<dendl;
+ if(context->list_context->seed == context->list_context->starting_pg_num){ //out of pgs!
+ dout(20) << "out of pgs, returning to" << context->final_finish << dendl;
+ context->list_context->in_loop = false;
+ context->final_finish->finish(0);
+ delete context->final_finish;
+ context->list_context = NULL;
+ context->entries = NULL;
+ context->objecter = NULL;
+ return;
+ }
+ // context->list_context->cookie = 0;
+ context->list_context->in_loop = true;
+ list_objects(context->pool_id, context->pool_snap_seq, context->max_entries,
+ *context->entries, context->list_context, context->final_finish);
+ context->final_finish = NULL;
+ context->list_context = NULL;
+ context->entries = NULL;
+ context->objecter = NULL;
+ // delete context;
+ return;
+}
+
+
//snapshots
void Objecter::create_pool_snap(int *reply, int pool, string& snapName, Context *onfinish) {
};
void tick();
void resend_slow_ops();
-
+ void _list_reply(void *cxt);
/*** track pending operations ***/
// read
delete fin;
}
};
+
+ // Pools and statistics
+ struct ListContext {
+ int seed;
+ __u64 cookie;
+ std::list<object_t> list;
+ std::list<object_t>::iterator iter;
+ __u64 pos;
+ __u64 total;
+ int starting_pg_num;
+ bool in_loop;
+
+ ListContext() : seed(0), cookie(0), pos(0), total(0), starting_pg_num(0), in_loop(false){}
+ };
+
+ struct C_List : public Context {
+ Objecter *objecter;
+ int pool_id;
+ int pool_snap_seq;
+ int max_entries;
+ int request_size;
+ std::list<object_t> *entries;
+ ListContext *list_context;
+ Context *final_finish;
+ bufferlist bl;
+ C_List(int p_id, int p_snap, int max, int rs, std::list<object_t> *entries_p,
+ ListContext *lc, Context * finish, Objecter *o) :
+ objecter(o), pool_id(p_id), pool_snap_seq(p_snap), max_entries(max), request_size(rs),
+ entries(entries_p), list_context(lc), final_finish(finish) {}
+ void finish(int r) {
+ objecter->_list_reply((void *)this);
+ }
+ };
- //
struct PoolStatOp {
tid_t tid;
vector<string> pools;
o->snapc = snapc;
return op_submit(o);
}
+
+
+ void list_objects(int pool_id, int pool_snap_seq, int max_entries, list<object_t>& entries,
+ ListContext *p, Context *onfinish);
+
// -------------------------
// snapshots
private: