]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Objecter: now has list instead of librados. Hurrah.
authorGreg Farnum <gregf@hq.newdream.net>
Fri, 19 Jun 2009 00:04:55 +0000 (17:04 -0700)
committerGreg Farnum <gregf@hq.newdream.net>
Fri, 19 Jun 2009 00:05:17 +0000 (17:05 -0700)
src/librados.cc
src/osdc/Objecter.cc
src/osdc/Objecter.h

index 6545727e6b53668a601e2508bfc47c519ab58112..214f9e051a9513f3c8300db5789d00d454cb8349 100644 (file)
@@ -107,22 +107,13 @@ public:
 
   int exec(PoolCtx& pool, const object_t& oid, const char *cls, const char *method, bufferlist& inbl, bufferlist& outbl);
 
-  struct PGLSOp {
-    int seed;
-    __u64 cookie;
-    std::list<object_t> list;
-    std::list<object_t>::iterator iter;
-    __u64 pos;
-    __u64 total;
-
-   PGLSOp() : seed(0), cookie(0), pos(0), total(0) {}
-  };
 
   int list_pools(std::vector<string>& ls);
   int get_pool_stats(std::vector<string>& ls, map<string,rados_pool_stat_t>& result);
   int get_fs_stats( rados_statfs_t& result );
 
-  int list(PoolCtx& pool, int max_entries, std::list<object_t>& entries, RadosClient::PGLSOp& op);
+  int list(PoolCtx& pool, int max_entries, std::list<object_t>& entries,
+                       Objecter::ListContext *context);
 
   // --- aio ---
   struct AioCompletion {
@@ -558,67 +549,23 @@ int RadosClient::snap_get_stamp(PoolCtx *pool, rados_snap_t snapid, time_t *t)
 
 // IO
 
-int RadosClient::list(PoolCtx& pool, int max_entries, std::list<object_t>& entries, RadosClient::PGLSOp& op)
-{
-  utime_t ut = g_clock.now();
-
+int RadosClient::list(PoolCtx& pool, int max_entries, std::list<object_t>& entries, Objecter::ListContext *context) {
   Cond cond;
   bool done;
   int r = 0;
   object_t oid;
+  Mutex mylock("RadosClient::list::mylock");
 
-  memset(&oid, 0, sizeof(oid));
-  entries.clear();
-
-  ceph_object_layout layout;
-retry:
-  int pg_num = objecter->osdmap->get_pg_num(pool.poolid);
-  
-  for (;op.seed <pg_num; op.seed++) {
-    int response_size;
-    int req_size;
-    
-    do {
-      lock.Lock();
-      int num = objecter->osdmap->get_pg_layout(pool.poolid, op.seed, layout);
-      lock.Unlock();
-      if (num != pg_num)  /* ahh.. race! */
-        goto retry;
-      
-      ObjectOperation rd;
-      bufferlist bl;
-#define MAX_REQ_SIZE 1024
-      req_size = min(MAX_REQ_SIZE, max_entries);
-      rd.pg_ls(req_size, op.cookie);
-
-      Mutex mylock("RadosClient::list::mylock");
-      Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
+  lock.Lock();
+  objecter->list_objects(pool.poolid, pool.snap_seq, max_entries, entries, context,
+                        new C_SafeCond(&mylock, &cond, &done, &r));
+  lock.Unlock();
 
-      lock.Lock();
-      objecter->read(oid, layout, rd, pool.snap_seq, &bl, 0, onack);
-      lock.Unlock();
+  mylock.Lock();
+  while(!done)
+    cond.Wait(mylock);
+  mylock.Unlock();
 
-      mylock.Lock();
-      while (!done)
-        cond.Wait(mylock);
-      mylock.Unlock();
-
-      bufferlist::iterator iter = bl.begin();
-      PGLSResponse response;
-      ::decode(response, iter);
-      op.cookie = (__u64)response.handle;
-      response_size = response.entries.size();
-      if (response_size) {
-       entries.merge(response.entries);
-        max_entries -= response_size;
-        if (!max_entries)
-          return r;
-      } else {
-        op.cookie = 0;
-      }
-    } while ((response_size == req_size) && op.cookie);
-  }
-  
   return r;
 }
 
@@ -920,16 +867,15 @@ int Rados::list(rados_pool_t pool, int max, std::list<object_t>& entries, Rados:
   if (!client)
     return -EINVAL;
 
-  RadosClient::PGLSOp *op;
+  Objecter::ListContext *op;
   if (!ctx.ctx) {
-    ctx.ctx = new RadosClient::PGLSOp;
+    ctx.ctx = new Objecter::ListContext();
     if (!ctx.ctx)
       return -ENOMEM;
   }
 
-  op = (RadosClient::PGLSOp *)ctx.ctx;
-
-  return client->list(*(RadosClient::PoolCtx *)pool, max, entries, *op);
+  op = (Objecter::ListContext *) ctx.ctx;
+  return client->list(*(RadosClient::PoolCtx *)pool, max, entries, op);
 }
 
 int Rados::write(rados_pool_t pool, const object_t& oid, off_t off, bufferlist& bl, size_t len)
@@ -1255,7 +1201,7 @@ extern "C" void rados_pool_init_ctx(rados_list_ctx_t *ctx)
 extern "C" void rados_pool_close_ctx(rados_list_ctx_t *ctx)
 {
   if (*ctx) {
-    RadosClient::PGLSOp *op = (RadosClient::PGLSOp *)*ctx;
+    Objecter::ListContext *op = (Objecter::ListContext *)*ctx;
     delete op;
     *ctx = NULL;
   }
@@ -1267,15 +1213,15 @@ extern "C" int rados_pool_list_next(rados_pool_t pool, const char **entry, rados
   RadosClient::PoolCtx *ctx = (RadosClient::PoolCtx *)pool;
 
   if (!*listctx) {
-    *listctx = new RadosClient::PGLSOp;
+    *listctx = new Objecter::ListContext;
     if (!*listctx)
       return -ENOMEM;
   }
-  RadosClient::PGLSOp *op = (RadosClient::PGLSOp *)*listctx;
+  Objecter::ListContext *op = (Objecter::ListContext *)*listctx;
   if (op->pos == op->total) {
     op->list.clear();
 #define MAX_ENTRIES 1024
-    ret = radosp->list(*ctx, MAX_ENTRIES, op->list, *op);
+    ret = radosp->list(*ctx, MAX_ENTRIES, op->list, op);
     if (!op->list.size()) {
       delete op;
       *listctx = NULL;
index 0865fe364d598442ad6f42b1a5b4970cb8db5c1a..9c1f884bc32d7fa6bfdc76e85e94c7636ddb97c6 100644 (file)
@@ -14,6 +14,7 @@
 
 #include "Objecter.h"
 #include "osd/OSDMap.h"
+#include "osd/PGLS.h"
 #include "mon/MonMap.h"
 
 #include "msg/Messenger.h"
@@ -39,6 +40,7 @@
 
 #include "config.h"
 
+#define MAX_REQ_SIZE 1024
 #define DOUT_SUBSYS objecter
 #undef dout_prefix
 #define dout_prefix *_dout << dbeginl << messenger->get_myname() << ".objecter "
@@ -471,6 +473,7 @@ tid_t Objecter::op_submit(Op *op)
 
 void Objecter::handle_osd_op_reply(MOSDOpReply *m)
 {
+  dout(10) << "in handle_osd_op_reply" << dendl;
   // get pio
   tid_t tid = m->get_tid();
 
@@ -563,6 +566,123 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
   delete m;
 }
 
+
+void Objecter::list_objects(int pool_id, int pool_snap_seq, int max_entries,
+                           std::list<object_t>& entries, ListContext * list_context, Context *onfinish) {
+
+  dout(10) << "list_objects" << dendl;
+  dout(20) << "pool_id " << pool_id
+          << "\npool_snap_seq " << pool_snap_seq
+          << "\nmax_entries " << max_entries
+          << "\n&entries " << &entries
+          << "\nlist_context " << list_context
+          << "\nonfinish " << onfinish
+          << "\nlist_context->seed" << list_context->seed
+          << "\nlist_context->cookie" << list_context->cookie << dendl;
+
+  if (!list_context->in_loop)
+    entries.clear();
+
+  ceph_object_layout layout;
+  int request_size;
+
+  object_t oid;
+  memset(&oid, 0, sizeof(oid)); //the read we perform later doesn't refer to an object,
+  //so make sure nothing can think it does
+
+  int pg_num = osdmap->get_pg_layout(pool_id, list_context->seed, layout);
+  if (list_context->starting_pg_num == 0) {// there can't be zero pgs!
+    list_context->starting_pg_num = pg_num;
+    dout(20) << pg_num << " placement groups" << dendl;
+  }
+  if (list_context->starting_pg_num != pg_num) { //start reading from the beginning; the pgs have changed
+    dout(10) << "The placement groups have changed, restarting with " << pg_num << dendl;
+    list_context->seed = 0;
+    list_context->cookie = 0;
+    list_context->starting_pg_num = pg_num;
+    osdmap->get_pg_layout(pool_id, list_context->seed, layout);
+  }
+  if(list_context->seed == pg_num){ //this context got all the way through
+    onfinish->finish(0);
+    delete onfinish;
+  }
+
+  ObjectOperation op;
+  request_size = min(MAX_REQ_SIZE, max_entries);
+  op.pg_ls(request_size, list_context->cookie);
+
+  C_List * onack = new C_List(pool_id, pool_snap_seq, max_entries, request_size,
+                              &entries, list_context, onfinish, this);
+  read(oid, layout, op, pool_snap_seq, &onack->bl, 0,(Context *)onack);
+}
+
+void Objecter::_list_reply(void *cxt)
+{
+  dout(10) << "_list_reply" << dendl;
+  C_List * context = (C_List *) cxt;
+  bufferlist::iterator iter = context->bl.begin();
+  PGLSResponse response;
+  ::decode(response, iter);
+  context->list_context->cookie = (__u64)response.handle;
+  int response_size = response.entries.size();
+  dout(20) << "response.entries.size " << response_size
+          <<"\nresponse.entries " << response.entries << dendl;
+  if (response_size) {
+    dout(20) << "got a response with objects, proceeding" << dendl;
+    context->entries->merge(response.entries);
+    context->max_entries -= response_size;
+    if (!context->max_entries) { //yay, we're done!
+      dout(20) << "reached requested number of objects, cleaning up and exiting" << dendl;
+      context->final_finish->finish(0);
+      delete context->final_finish;
+      context->list_context->in_loop = false;
+      context->list_context = NULL;
+      context->entries = NULL;
+      context->objecter = NULL;
+      //      delete context;
+      return;
+    }
+    if ((response_size == context->request_size) && context->list_context->cookie) {
+      //there are probably more objects in this pg
+      dout(20) << "expect more objects in current pg, proceeding" << dendl;
+      context->list_context->in_loop = true;
+      list_objects(context->pool_id, context->pool_snap_seq, context->max_entries,
+                  *context->entries, context->list_context, context->final_finish);
+      context->final_finish = NULL;
+      context->list_context = NULL;
+      context->entries = NULL;
+      context->objecter = NULL;
+      //      delete context;
+      return;
+    }
+  }
+  //if we make this this far, there are no more objects in the current pg. Move on!
+  dout(20) << "emptied current pg, moving on to next one" << dendl;
+  ++context->list_context->seed;
+  dout(20) << "new seed value: " << context->list_context->seed <<dendl;
+  if(context->list_context->seed == context->list_context->starting_pg_num){ //out of pgs!
+    dout(20) << "out of pgs, returning to" << context->final_finish << dendl;
+    context->list_context->in_loop = false;
+    context->final_finish->finish(0);
+    delete context->final_finish;
+    context->list_context = NULL;
+    context->entries = NULL;
+    context->objecter = NULL;
+    return;
+  }
+  //  context->list_context->cookie = 0;
+  context->list_context->in_loop = true;
+  list_objects(context->pool_id, context->pool_snap_seq, context->max_entries,
+              *context->entries, context->list_context, context->final_finish);
+  context->final_finish = NULL;
+  context->list_context = NULL;
+  context->entries = NULL;
+  context->objecter = NULL;
+  //  delete context;
+  return;
+}
+
+
 //snapshots
 
 void Objecter::create_pool_snap(int *reply, int pool, string& snapName, Context *onfinish) {
index 2ee90d69fefee050b58f57dfa4082d73d62297c5..246da414837c218365dc02a9534adf7558138911 100644 (file)
@@ -186,7 +186,7 @@ class Objecter {
   };
   void tick();
   void resend_slow_ops();
-
+  void _list_reply(void *cxt);
 
   /*** track pending operations ***/
   // read
@@ -246,8 +246,40 @@ class Objecter {
       delete fin;
     }
   };
+
+  // Pools and statistics 
+  struct ListContext {
+    int seed;
+    __u64 cookie;
+    std::list<object_t> list;
+    std::list<object_t>::iterator iter;
+    __u64 pos;
+    __u64 total;
+    int starting_pg_num;
+    bool in_loop;
+
+   ListContext() : seed(0), cookie(0), pos(0), total(0), starting_pg_num(0), in_loop(false){}
+  };
+
+  struct C_List : public Context {
+    Objecter *objecter;
+    int pool_id;
+    int pool_snap_seq;
+    int max_entries;
+    int request_size;
+    std::list<object_t> *entries;
+    ListContext *list_context;
+    Context *final_finish;
+    bufferlist bl;
+    C_List(int p_id, int p_snap, int max, int rs, std::list<object_t> *entries_p,
+          ListContext *lc, Context * finish, Objecter *o) :
+      objecter(o), pool_id(p_id), pool_snap_seq(p_snap), max_entries(max), request_size(rs),
+      entries(entries_p), list_context(lc), final_finish(finish) {}
+    void finish(int r) {
+      objecter->_list_reply((void *)this);
+    }
+  };
   
-  // 
   struct PoolStatOp {
     tid_t tid;
     vector<string> pools;
@@ -481,6 +513,11 @@ private:
     o->snapc = snapc;
     return op_submit(o);
   }
+
+
+  void list_objects(int pool_id, int pool_snap_seq, int max_entries, list<object_t>& entries,
+                   ListContext *p, Context *onfinish);
+
   // -------------------------
   // snapshots
 private: