]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rados: pgls fixes, uses context
authorYehuda Sadeh <yehuda@hq.newdream.net>
Wed, 3 Jun 2009 23:56:58 +0000 (16:56 -0700)
committerYehuda Sadeh <yehuda@hq.newdream.net>
Wed, 3 Jun 2009 23:56:58 +0000 (16:56 -0700)
src/include/librados.h
src/include/rados.h
src/librados.cc
src/os/FileStore.cc
src/os/ObjectStore.h
src/osd/PGLS.h [new file with mode: 0644]
src/osd/ReplicatedPG.cc
src/osdc/Objecter.h
src/testradospp.cc

index 7aa5554753e04d6714380d78226b61435360c1c9..fffaacf94ab75e389d5e89d31dde4181463b421e 100644 (file)
@@ -20,11 +20,13 @@ extern "C" {
 int rados_initialize(int argc, const char **argv); /* arguments are optional */
 void rados_deinitialize();
 
+typedef void *rados_list_ctx_t;
+
 /* pools */
 typedef int rados_pool_t;
 int rados_open_pool(const char *name, rados_pool_t *pool);
 int rados_close_pool(rados_pool_t pool);
-int rados_list(rados_pool_t pool);
+int rados_list(rados_pool_t pool, int max, struct ceph_object *entries, rados_list_ctx_t *ctx);
 
 /* read/write objects */
 int rados_write(rados_pool_t pool, struct ceph_object *oid, off_t off, const char *buf, size_t len);
@@ -61,7 +63,6 @@ public:
 
   int open_pool(const char *name, rados_pool_t *pool);
   int close_pool(rados_pool_t pool);
-  int list(rados_pool_t pool, vector<string>& entries);
 
   int write(rados_pool_t pool, object_t& oid, off_t off, bufferlist& bl, size_t len);
   int read(rados_pool_t pool, object_t& oid, off_t off, bufferlist& bl, size_t len);
@@ -70,7 +71,13 @@ public:
   int exec(rados_pool_t pool, object_t& oid, const char *cls, const char *method,
              bufferlist& inbl, bufferlist& outbl);
 
-  
+ struct ListCtx {
+   void *ctx;
+   ListCtx() : ctx(NULL) {}
+ };
+
+  int list(rados_pool_t pool, int max, vector<ceph_object>& entries, Rados::ListCtx& ctx);
+
   // -- aio --
   struct AioCompletion {
     void *pc;
index f82b8d0ec846c74a16f02f7fa90253c26057bff2..08ef01d481bd23fecce9d93f0e232bc70ffe0547 100644 (file)
@@ -352,6 +352,9 @@ struct ceph_osd_op {
                        __u8 argc;
                        __le32 indata_len;
                } __attribute__ ((packed));
+               struct {
+                       __le64 pgls_cookie, count;
+               };
        };
 } __attribute__ ((packed));
 
index 1159c586008cffa1010a07c4cb2af6ab0bcab0ab..60bd52f950f34c8d8782541b8ebe698be8bb1631 100644 (file)
@@ -26,6 +26,7 @@ using namespace std;
 #include "mon/MonMap.h"
 #include "mds/MDS.h"
 #include "osd/OSDMap.h"
+#include "osd/PGLS.h"
 
 #include "msg/SimpleMessenger.h"
 
@@ -69,14 +70,20 @@ public:
     return osdmap.lookup_pg_pool_name(name);
   }
 
-  int list(int pool, vector<string>& entries);
-
   int write(int pool, object_t& oid, off_t off, bufferlist& bl, size_t len);
   int read(int pool, object_t& oid, off_t off, bufferlist& bl, size_t len);
   int remove(int pool, object_t& oid);
 
   int exec(int pool, object_t& oid, const char *cls, const char *method, bufferlist& inbl, bufferlist& outbl);
 
+  struct PGLSOp {
+    int seed;
+    __u64 cookie;
+
+   PGLSOp() : seed(0), cookie(0) {}
+  };
+
+  int list(int pool, int max_entries, vector<ceph_object>& entries, RadosClient::PGLSOp& op);
 
   // --- aio ---
   struct AioCompletion {
@@ -307,7 +314,7 @@ bool RadosClient::_dispatch(Message *m)
   return true;
 }
 
-int RadosClient::list(int pool, vector<string>& entries)
+int RadosClient::list(int pool, int max_entries, vector<ceph_object>& entries, RadosClient::PGLSOp& op)
 {
   SnapContext snapc;
   utime_t ut = g_clock.now();
@@ -320,41 +327,61 @@ int RadosClient::list(int pool, vector<string>& entries)
 
   memset(&oid, 0, sizeof(oid));
 
-  dout(0) << hex << "pool=" << dec << pool << dendl;
+  dout(0) << hex << "pool=" << dec << pool << " op.cookie=" << op.cookie << dendl;
 
   ceph_object_layout layout;
 retry:
   int pg_num = objecter->osdmap->get_pg_num(pool);
 
-  for (int i=0; i<pg_num; i++) {
-   int num = objecter->osdmap->get_pg_layout(pool, i, layout);
-   if (num != pg_num)  /* ahh.. race! */
-      goto retry;
+  for (;op.seed <pg_num; op.seed++) {
+   int response_size;
+   int req_size;
 
-    lock.Lock();
+   do {
+     int num = objecter->osdmap->get_pg_layout(pool, op.seed, layout);
+     if (num != pg_num)  /* ahh.. race! */
+        goto retry;
 
-    ObjectRead rd;
-    bufferlist bl;
-    rd.pg_ls(0, 1024);
-    Context *onack = new C_SafeCond(&lock, &cond, &done, &r);
-    objecter->read(oid, layout, rd, CEPH_NOSNAP, &bl, 0, onack);
+      lock.Lock();
 
-    while (!done)
-      cond.Wait(lock);
+      ObjectRead rd;
+      bufferlist bl;
+#define MAX_REQ_SIZE 1024
+      req_size = min(MAX_REQ_SIZE, max_entries);
+      rd.pg_ls(req_size, op.cookie);
 
-    lock.Unlock();
-    dout(0) << "after pg_ls(" << i << ") r=" << r << " got " << bl.length() << " bytes ol_pgls=" << hex << layout.ol_pgid << dec << dendl;
+      Context *onack = new C_SafeCond(&lock, &cond, &done, &r);
+      objecter->read(oid, layout, rd, CEPH_NOSNAP, &bl, 0, onack);
 
-    vector<pobject_t> ls;
-    bufferlist::iterator iter = bl.begin();
-    ::decode(ls, iter);
-    if (ls.size() > 0) {
-      vector<pobject_t>::iterator ls_iter;
+      while (!done)
+        cond.Wait(lock);
 
-      for (ls_iter = ls.begin(); ls_iter != ls.end(); ++ls_iter) {
-        dout(0) << "entry: " << *ls_iter << dendl;
+      lock.Unlock();
+      dout(0) << "after pg_ls(" << op.seed << ") r=" << r << " got " << bl.length() << " bytes ol_pgls=" << hex << layout.ol_pgid << dec << dendl;
+      dout(0) << "max_entries=" << max_entries << dendl;
+
+      bufferlist::iterator iter = bl.begin();
+      PGLSResponse response;
+      ::decode(response, iter);
+      op.cookie = (__u64)response.handle;
+      response_size = response.entries.size();
+      if (response_size) {
+        vector<pobject_t>::iterator ls_iter;
+
+        for (ls_iter = response.entries.begin(); ls_iter != response.entries.end(); ++ls_iter) {
+          dout(0) << "entry: " << *ls_iter << dendl;
+          ceph_object obj = (ceph_object)ls_iter->oid;
+          entries.push_back(obj);
+        }
+        max_entries -= response_size;
+        dout(0) << "op.cookie=" << op.cookie << dendl;
+
+        if (!max_entries)
+          return r;
+      } else {
+        op.cookie = 0;
       }
-    }
+    } while ((response_size == req_size) && op.cookie);
  }
 
   return r;
@@ -550,12 +577,22 @@ bool Rados::initialize(int argc, const char *argv[])
   return client->init();
 }
 
-int Rados::list(rados_pool_t pool, vector<string>& entries)
+int Rados::list(rados_pool_t pool, int max, vector<ceph_object>& entries, Rados::ListCtx& ctx)
 {
   if (!client)
     return -EINVAL;
 
-  return client->list(pool, entries);
+  RadosClient::PGLSOp *op;
+  if (!ctx.ctx) {
+    ctx.ctx = new RadosClient::PGLSOp;
+    if (!ctx.ctx)
+      return -ENOMEM;
+  }
+
+  op = (RadosClient::PGLSOp *)ctx.ctx;
+
+
+  return client->list(pool, max, entries, *op);
 }
 
 int Rados::write(rados_pool_t pool, object_t& oid, off_t off, bufferlist& bl, size_t len)
@@ -781,11 +818,27 @@ extern "C" int rados_exec(rados_pool_t pool, ceph_object *o, const char *cls, co
   return ret;
 }
 
-extern "C" int rados_list(rados_pool_t pool)
+extern "C" int rados_list(rados_pool_t pool, int max, struct ceph_object *entries, rados_list_ctx_t *ctx)
 {
   int ret;
-  vector<string> entries;
-  ret = radosp->list(pool, entries);
+
+  if (!*ctx) {
+    *ctx = new RadosClient::PGLSOp;
+    if (!*ctx)
+      return -ENOMEM;
+  }
+  RadosClient::PGLSOp *op = (RadosClient::PGLSOp *)*ctx;
+
+  vector<ceph_object> vec;
+  ret = radosp->list(pool, max, vec, *op);
+  if (!vec.size()) {
+    delete op;
+    *ctx = NULL;
+  }
+
+  for (int i=0; i<vec.size(); i++) {
+    entries[i] = vec[i];
+  }
 
   return ret;
 }
index c68182619421e40f492868606da4b9e4536d39fd..06f8487dc1f8b6d4a7e159bcaf79292e8456fe40 100644 (file)
@@ -1895,17 +1895,19 @@ int FileStore::collection_list_partial(coll_t c, vector<pobject_t>& ls, int max_
   // first, build (ino, object) list
   vector< pair<ino_t,pobject_t> > inolist;
 
-  if (handle)
-    dir = *(DIR **)handle;
 
-  if (!dir)
-    dir = ::opendir(fn);
+  dir = ::opendir(fn);
 
   if (!dir) {
     dout(0) << "error opening directory " << fn << dendl;
     return -errno;
   }
 
+  if (handle) {
+    dout(0) << "seeking to position " << *(off_t *)handle << dendl;
+    seekdir(dir, *(off_t *)handle);
+  }
+
   for (int i=0; i<max_count; i++) {
     errno = 0;
     de = ::readdir(dir);
@@ -1917,8 +1919,13 @@ int FileStore::collection_list_partial(coll_t c, vector<pobject_t>& ls, int max_
       break;
 
     // parse
-    if (de->d_name[0] == '.') continue;
+    if (de->d_name[0] == '.') {
+      i--;
+      continue;
+    }
     //cout << "  got object " << de->d_name << std::endl;
+
+    dout(0) << "readdir: " << de->d_name << dendl;
     pobject_t o;
     if (parse_object(de->d_name, o)) {
       inolist.push_back(pair<ino_t,pobject_t>(de->d_ino, o));
@@ -1926,11 +1933,12 @@ int FileStore::collection_list_partial(coll_t c, vector<pobject_t>& ls, int max_
     }
   }
 
-  if (!handle || !de)
-    ::closedir(dir);
+  if (handle) {
+    *handle = (collection_list_handle_t)telldir(dir);
+    dout(0) << "returning handle=" << (__u64)*handle << dendl;
+  }
 
-  if (handle)
-    *handle = (collection_list_handle_t)dir;
+  ::closedir(dir);
 
   // build final list
   ls.resize(inolist.size());
index 0c384f9a79335ab30ef11d62125e46f2e06db7cf..0202e9e10b8d541f4e5c4656e4ed9552d28f8ef7 100644 (file)
@@ -38,7 +38,7 @@ using std::vector;
 # define MIN(a,b) ((a) < (b) ? (a):(b))
 #endif
 
-typedef void *collection_list_handle_t;
+typedef __u64 collection_list_handle_t;
 
 /*
  * low-level interface to the local OSD file system
diff --git a/src/osd/PGLS.h b/src/osd/PGLS.h
new file mode 100644 (file)
index 0000000..a9cb131
--- /dev/null
@@ -0,0 +1,27 @@
+#ifndef __PGLS_H
+#define __PGLS_H
+
+
+#include "include/types.h"
+#include "os/ObjectStore.h"
+
+struct PGLSResponse {
+  collection_list_handle_t handle; 
+  vector<pobject_t> entries;
+
+  void encode(bufferlist& bl) const {
+    ::encode((__u64)handle, bl);
+    ::encode(entries, bl);
+  }
+  void decode(bufferlist::iterator& bl) {
+    __u64 tmp;
+    ::decode(tmp, bl);
+    handle = (collection_list_handle_t)tmp;
+    ::decode(entries, bl);
+  }
+};
+
+WRITE_CLASS_ENCODER(PGLSResponse)
+
+
+#endif
index b1a8bb4035b5a4f96caf950dc525fb7cfb26f4f9..e57886371a401aba8e95903d3564123577d65938 100644 (file)
@@ -14,6 +14,7 @@
 
 #include "ReplicatedPG.h"
 #include "OSD.h"
+#include "PGLS.h"
 
 #include "common/arch.h"
 #include "common/Logger.h"
@@ -380,14 +381,11 @@ void ReplicatedPG::do_pg_op(MOSDOp *op)
       {
         dout(10) << " pgls pg=" << op->get_pg() << dendl;
        // read into a buffer
-        vector<pobject_t> vec;
-        collection_list_handle_t handle = NULL;
-       result = osd->store->collection_list_partial(op->get_pg().to_coll(), vec, p->length, &handle);
+        PGLSResponse response;
+        response.handle = (collection_list_handle_t)(__u64)(p->pgls_cookie);
+       result = osd->store->collection_list_partial(op->get_pg().to_coll(), response.entries, p->length, &response.handle);
        if (!result) {
-#if 0
-         ctx->data_off = op.offset;
-#endif
-         ::encode(vec, outdata);
+         ::encode(response, outdata);
         }
        dout(10) << " pgls result=" << result << " outdata.length()=" << outdata.length() << dendl;
       }
@@ -864,22 +862,6 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<ceph_osd_op>& ops,
       }
       break;
 
-    case CEPH_OSD_OP_PGLS:
-      {
-       // read into a buffer
-        vector<pobject_t> vec;
-        collection_list_handle_t handle = NULL;
-       int r = osd->store->collection_list_partial(info.pgid.to_coll(), vec, op.length, &handle);
-       if (!r) {
-         ctx->data_off = op.offset;
-         ::encode(vec, odata);
-        } else {
-         result = r;
-       }
-       dout(10) << " read got " << r << " / " << op.length << " bytes from obj " << soid << dendl;
-      }
-      break;
-
     case CEPH_OSD_OP_RDCALL:
       {
        string cname, mname;
index 2314d11bc775a152c1130b936e07161ab49f3f9f..e3301aa5162990ac350a5c4da281ad447fff3982 100644 (file)
@@ -72,6 +72,14 @@ struct ObjectOperation {
     data.append(method, ops[s].method_len);
     data.append(indata);
   }
+  void add_pgls(int op, __u64 count, __u64 cookie) {
+    int s = ops.size();
+    ops.resize(s+1);
+    memset(&ops[s], 0, sizeof(ops[s]));
+    ops[s].op = op;
+    ops[s].count = count;
+    ops[s].pgls_cookie = cookie;
+  }
 
   ObjectOperation() : flags(0) {}
 };
@@ -93,8 +101,8 @@ struct ObjectRead : public ObjectOperation {
     add_call(CEPH_OSD_OP_RDCALL, cname, method, indata);
   }
 
-  void pg_ls(__u64 off, __u64 len) {
-    add_data(CEPH_OSD_OP_PGLS, off, len);
+  void pg_ls(__u64 count, __u64 cookie) {
+    add_pgls(CEPH_OSD_OP_PGLS, count, cookie);
     flags |= CEPH_OSD_FLAG_PGOP;
   }
 };
index c10a663e3cf7d3c34ab25a28bd24728e7aeb2cbb..ea32fd898bc3998d9c8d46847b609a0eb9e56812 100644 (file)
@@ -44,7 +44,7 @@ int main(int argc, const char **argv)
 
   object_t oid;
   memset(&oid, 0, sizeof(oid));
-  oid.ino = 0x2010;
+  oid.ino = 0x2020;
 
   rados_pool_t pool;
   int r = rados.open_pool("data", &pool);
@@ -70,9 +70,18 @@ int main(int argc, const char **argv)
   cout << "read result=" << bl2.c_str() << std::endl;
   cout << "size=" << size << std::endl;
 
-  vector<string> vec;
-  r = rados.list(pool, vec);
-  cout << "read result=" << r << " pool=" << hex << pool << dec << std::endl;
+  Rados::ListCtx ctx;
+  int entries;
+  do {
+    vector<ceph_object> vec;
+    r = rados.list(pool, 1, vec, ctx);
+    entries = vec.size();
+    cout << "list result=" << r << " entries=" << entries << std::endl;
+    vector<ceph_object>::iterator iter;
+    for (iter = vec.begin(); iter != vec.end(); ++iter) {
+      cout << *iter << std::endl;
+    }
+  } while (entries);
 #if 0
   r = rados.remove(pool, oid);
   cout << "remove result=" << r << std::endl;