]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rados: can set snapshot sequence
authorYehuda Sadeh <yehuda@hq.newdream.net>
Fri, 5 Jun 2009 20:24:25 +0000 (13:24 -0700)
committerYehuda Sadeh <yehuda@hq.newdream.net>
Fri, 5 Jun 2009 20:25:11 +0000 (13:25 -0700)
src/include/librados.h
src/librados.cc
src/os/FileStore.cc
src/os/FileStore.h
src/os/ObjectStore.h
src/osd/ReplicatedPG.cc
src/testradospp.cc

index b2bfb0e777175d7b55a7239a4d300b23bee8681a..b656e097899269ad83c2ba4edd5ebac2f586a8f7 100644 (file)
@@ -23,9 +23,10 @@ void rados_deinitialize();
 typedef void *rados_list_ctx_t;
 
 /* pools */
-typedef int rados_pool_t;
+typedef void *rados_pool_t;
 int rados_open_pool(const char *name, rados_pool_t *pool);
 int rados_close_pool(rados_pool_t pool);
+void rados_set_snap(rados_pool_t pool, int seq);
 void rados_pool_init_ctx(rados_list_ctx_t *ctx);
 void rados_pool_close_ctx(rados_list_ctx_t *ctx);
 int rados_pool_list_next(rados_pool_t pool, const char **entry, rados_list_ctx_t *ctx);
@@ -66,6 +67,8 @@ public:
   int open_pool(const char *name, rados_pool_t *pool);
   int close_pool(rados_pool_t pool);
 
+  void set_snap(rados_pool_t pool, snapid_t seq);
+
   int write(rados_pool_t pool, const object_t& oid, off_t off, bufferlist& bl, size_t len);
   int read(rados_pool_t pool, const object_t& oid, off_t off, bufferlist& bl, size_t len);
   int remove(rados_pool_t pool, const object_t& oid);
@@ -92,9 +95,9 @@ public:
     void put();
   };
 
-  int aio_read(int pool, const object_t& oid, off_t off, bufferlist *pbl, size_t len,
+  int aio_read(rados_pool_t pool, const object_t& oid, off_t off, bufferlist *pbl, size_t len,
               AioCompletion **pc);
-  int aio_write(int pool, const object_t& oid, off_t off, bufferlist& bl, size_t len,
+  int aio_write(rados_pool_t pool, const object_t& oid, off_t off, bufferlist& bl, size_t len,
                AioCompletion **pc);
 
 };
index 03d533de72050393bbf4748a53f068d0351de2de..c931d6419a9f31f6ddadb8e3981649d59fc2b975 100644 (file)
@@ -66,15 +66,23 @@ public:
   ~RadosClient();
   bool init();
 
+  struct PoolCtx {
+    int poolid;
+    SnapContext snapc;
+    snapid_t snap_seq;
+  };
+
   int lookup_pool(const char *name) {
     return osdmap.lookup_pg_pool_name(name);
   }
 
-  int write(int pool, const object_t& oid, off_t off, bufferlist& bl, size_t len);
-  int read(int pool, const object_t& oid, off_t off, bufferlist& bl, size_t len);
-  int remove(int pool, const object_t& oid);
+  void set_snap(PoolCtx& pool, snapid_t seq);
+
+  int write(PoolCtx& pool, const object_t& oid, off_t off, bufferlist& bl, size_t len);
+  int read(PoolCtx& pool, const object_t& oid, off_t off, bufferlist& bl, size_t len);
+  int remove(PoolCtx& pool, const object_t& oid);
 
-  int exec(int pool, const object_t& oid, const char *cls, const char *method, bufferlist& inbl, bufferlist& outbl);
+  int exec(PoolCtx& pool, const object_t& oid, const char *cls, const char *method, bufferlist& inbl, bufferlist& outbl);
 
   struct PGLSOp {
     int seed;
@@ -87,7 +95,7 @@ public:
    PGLSOp() : seed(0), cookie(0), pos(0), total(0) {}
   };
 
-  int list(int pool, int max_entries, std::list<object_t>& entries, RadosClient::PGLSOp& op);
+  int list(PoolCtx& pool, int max_entries, std::list<object_t>& entries, RadosClient::PGLSOp& op);
 
   // --- aio ---
   struct AioCompletion {
@@ -198,12 +206,12 @@ public:
     }
   };
 
-  int aio_read(int pool, object_t oid, off_t off, bufferlist *pbl, size_t len,
+  int aio_read(PoolCtx& pool, object_t oid, off_t off, bufferlist *pbl, size_t len,
               AioCompletion **pc);
-  int aio_read(int pool, object_t oid, off_t off, char *buf, size_t len,
+  int aio_read(PoolCtx& pool, object_t oid, off_t off, char *buf, size_t len,
               AioCompletion **pc);
 
-  int aio_write(int pool, object_t oid, off_t off, const bufferlist& bl, size_t len,
+  int aio_write(PoolCtx& pool, object_t oid, off_t off, const bufferlist& bl, size_t len,
                AioCompletion **pc);
 
 };
@@ -318,9 +326,8 @@ bool RadosClient::_dispatch(Message *m)
   return true;
 }
 
-int RadosClient::list(int pool, int max_entries, std::list<object_t>& entries, RadosClient::PGLSOp& op)
+int RadosClient::list(PoolCtx& pool, int max_entries, std::list<object_t>& entries, RadosClient::PGLSOp& op)
 {
-  SnapContext snapc;
   utime_t ut = g_clock.now();
 
   Mutex lock("RadosClient::list");
@@ -334,14 +341,14 @@ int RadosClient::list(int pool, int max_entries, std::list<object_t>& entries, R
 
   ceph_object_layout layout;
 retry:
-  int pg_num = objecter->osdmap->get_pg_num(pool);
+  int pg_num = objecter->osdmap->get_pg_num(pool.poolid);
 
   for (;op.seed <pg_num; op.seed++) {
    int response_size;
    int req_size;
 
    do {
-     int num = objecter->osdmap->get_pg_layout(pool, op.seed, layout);
+     int num = objecter->osdmap->get_pg_layout(pool.poolid, op.seed, layout);
      if (num != pg_num)  /* ahh.. race! */
         goto retry;
 
@@ -354,7 +361,7 @@ retry:
       rd.pg_ls(req_size, op.cookie);
 
       Context *onack = new C_SafeCond(&lock, &cond, &done, &r);
-      objecter->read(oid, layout, rd, CEPH_NOSNAP, &bl, 0, onack);
+      objecter->read(oid, layout, rd, pool.snap_seq, &bl, 0, onack);
 
       while (!done)
         cond.Wait(lock);
@@ -380,23 +387,58 @@ retry:
   return r;
 }
 
-int RadosClient::write(int pool, const object_t& oid, off_t off, bufferlist& bl, size_t len)
+void RadosClient::set_snap(PoolCtx& pool, snapid_t seq)
 {
-  SnapContext snapc;
+  if (!seq) {
+    seq = CEPH_NOSNAP;
+  }
+  pool.snap_seq = seq;
+}
+
+int RadosClient::write(PoolCtx& pool, const object_t& oid, off_t off, bufferlist& bl, size_t len)
+{
+#if 0
+  static SnapContext snapc;
+  static int i;
+
+  snapc.snaps.clear();
+
+#define START_SNAP 1
+
+  if (snapc.seq == 0)
+    snapc.seq = START_SNAP - 1;
+
+  ++snapc.seq;
+  for (i=0; i<snapc.seq-START_SNAP + 1; i++) {
+     snapc.snaps.push_back(snapc.seq - i);
+  }
+  i = 0;
+  for (vector<snapid_t>::iterator iter = snapc.snaps.begin();
+       iter != snapc.snaps.end(); ++iter, ++i) {
+    dout(0) << "snapc[" << i << "] = " << *iter << dendl;
+  }
+  dout(0) << "seq=" << snapc.seq << dendl;
+  dout(0) << "snapc=" << snapc << dendl;
+#endif
   utime_t ut = g_clock.now();
 
+  /* can't write to a snapshot */
+  if (pool.snap_seq != CEPH_NOSNAP)
+    return -EINVAL;
+
   Mutex lock("RadosClient::write");
   Cond cond;
   bool done;
   int r;
+
   Context *onack = new C_SafeCond(&lock, &cond, &done, &r);
-  ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool);
+  ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool.poolid);
 
   dout(0) << "going to write" << dendl;
 
   lock.Lock();
   objecter->write(oid, layout,
-                 off, len, snapc, bl, ut, 0,
+                 off, len, pool.snapc, bl, ut, 0,
                  onack, NULL);
   while (!done)
     cond.Wait(lock);
@@ -405,7 +447,7 @@ int RadosClient::write(int pool, const object_t& oid, off_t off, bufferlist& bl,
   return len;
 }
 
-int RadosClient::aio_read(int pool, const object_t oid, off_t off, bufferlist *pbl, size_t len,
+int RadosClient::aio_read(PoolCtx& pool, const object_t oid, off_t off, bufferlist *pbl, size_t len,
                          AioCompletion **pc)
 {
   AioCompletion *c = new AioCompletion;
@@ -413,15 +455,15 @@ int RadosClient::aio_read(int pool, const object_t oid, off_t off, bufferlist *p
 
   c->pbl = pbl;
 
-  ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool);
+  ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool.poolid);
   objecter->read(oid, layout,
-                off, len, CEPH_NOSNAP, &c->bl, 0,
+                off, len, pool.snap_seq, &c->bl, 0,
                  onack);
 
   *pc = c;
   return 0;
 }
-int RadosClient::aio_read(int pool, const object_t oid, off_t off, char *buf, size_t len,
+int RadosClient::aio_read(PoolCtx& pool, const object_t oid, off_t off, char *buf, size_t len,
                          AioCompletion **pc)
 {
   AioCompletion *c = new AioCompletion;
@@ -430,35 +472,34 @@ int RadosClient::aio_read(int pool, const object_t oid, off_t off, char *buf, si
   c->buf = buf;
   c->maxlen = len;
 
-  ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool);
+  ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool.poolid);
   objecter->read(oid, layout,
-                off, len, CEPH_NOSNAP, &c->bl, 0,
+                off, len, pool.snap_seq, &c->bl, 0,
                  onack);
 
   *pc = c;
   return 0;
 }
 
-int RadosClient::aio_write(int pool, const object_t oid, off_t off, const bufferlist& bl, size_t len,
+int RadosClient::aio_write(PoolCtx& pool, const object_t oid, off_t off, const bufferlist& bl, size_t len,
                           AioCompletion **pc)
 {
-  SnapContext snapc;
   utime_t ut = g_clock.now();
 
   AioCompletion *c = new AioCompletion;
   Context *onack = new C_aio_Ack(c);
   Context *onsafe = new C_aio_Safe(c);
 
-  ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool);
+  ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool.poolid);
   objecter->write(oid, layout,
-                 off, len, snapc, bl, ut, 0,
+                 off, len, pool.snapc, bl, ut, 0,
                  onack, onsafe);
 
   *pc = c;
   return 0;
 }
 
-int RadosClient::remove(int pool, const object_t& oid)
+int RadosClient::remove(PoolCtx& pool, const object_t& oid)
 {
   SnapContext snapc;
   utime_t ut = g_clock.now();
@@ -468,7 +509,7 @@ int RadosClient::remove(int pool, const object_t& oid)
   bool done;
   int r;
   Context *onack = new C_SafeCond(&lock, &cond, &done, &r);
-  ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool);
+  ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool.poolid);
 
   dout(0) << "going to write" << dendl;
 
@@ -483,10 +524,9 @@ int RadosClient::remove(int pool, const object_t& oid)
   return r;
 }
 
-int RadosClient::exec(int pool, const object_t& oid, const char *cls, const char *method,
+int RadosClient::exec(PoolCtx& pool, const object_t& oid, const char *cls, const char *method,
                      bufferlist& inbl, bufferlist& outbl)
 {
-  SnapContext snapc;
   utime_t ut = g_clock.now();
 
   Mutex lock("RadosClient::rdcall");
@@ -495,13 +535,13 @@ int RadosClient::exec(int pool, const object_t& oid, const char *cls, const char
   int r;
   Context *onack = new C_SafeCond(&lock, &cond, &done, &r);
 
-  ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool);
+  ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool.poolid);
 
   lock.Lock();
 
   ObjectRead rd;
   rd.rdcall(cls, method, inbl);
-  objecter->read(oid, layout, rd, CEPH_NOSNAP, &outbl, 0, onack);
+  objecter->read(oid, layout, rd, pool.snap_seq, &outbl, 0, onack);
 
   while (!done)
     cond.Wait(lock);
@@ -512,23 +552,21 @@ int RadosClient::exec(int pool, const object_t& oid, const char *cls, const char
   return r;
 }
 
-int RadosClient::read(int pool, const object_t& oid, off_t off, bufferlist& bl, size_t len)
+int RadosClient::read(PoolCtx& pool, const object_t& oid, off_t off, bufferlist& bl, size_t len)
 {
-  SnapContext snapc;
-
   Mutex lock("RadosClient::read");
   Cond cond;
   bool done;
   int r;
   Context *onack = new C_SafeCond(&lock, &cond, &done, &r);
 
-  ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool);
+  ceph_object_layout layout = objecter->osdmap->make_object_layout(oid, pool.poolid);
 
   dout(0) << "going to read" << dendl;
 
   lock.Lock();
   objecter->read(oid, layout,
-             off, len, CEPH_NOSNAP, &bl, 0,
+             off, len, pool.snap_seq, &bl, 0,
               onack);
   while (!done)
     cond.Wait(lock);
@@ -570,6 +608,16 @@ int Rados::initialize(int argc, const char *argv[])
   return client->init() ? 0 : -1;
 }
 
+void Rados::set_snap(rados_pool_t pool, snapid_t seq)
+{
+  if (!client)
+    return;
+
+  RadosClient::PoolCtx *ctx = (RadosClient::PoolCtx *)pool;
+
+  client->set_snap(*ctx, seq);
+}
+
 int Rados::list(rados_pool_t pool, int max, std::list<object_t>& entries, Rados::ListCtx& ctx)
 {
   if (!client)
@@ -584,7 +632,7 @@ int Rados::list(rados_pool_t pool, int max, std::list<object_t>& entries, Rados:
 
   op = (RadosClient::PGLSOp *)ctx.ctx;
 
-  return client->list(pool, max, entries, *op);
+  return client->list(*(RadosClient::PoolCtx *)pool, max, entries, *op);
 }
 
 int Rados::write(rados_pool_t pool, const object_t& oid, off_t off, bufferlist& bl, size_t len)
@@ -592,7 +640,7 @@ int Rados::write(rados_pool_t pool, const object_t& oid, off_t off, bufferlist&
   if (!client)
     return -EINVAL;
 
-  return client->write(pool, oid, off, bl, len);
+  return client->write(*(RadosClient::PoolCtx *)pool, oid, off, bl, len);
 }
 
 int Rados::remove(rados_pool_t pool, const object_t& oid)
@@ -600,7 +648,7 @@ int Rados::remove(rados_pool_t pool, const object_t& oid)
   if (!client)
     return -EINVAL;
 
-  return client->remove(pool, oid);
+  return client->remove(*(RadosClient::PoolCtx *)pool, oid);
 }
 
 int Rados::read(rados_pool_t pool, const object_t& oid, off_t off, bufferlist& bl, size_t len)
@@ -608,7 +656,7 @@ int Rados::read(rados_pool_t pool, const object_t& oid, off_t off, bufferlist& b
   if (!client)
     return -EINVAL;
 
-  return client->read(pool, oid, off, bl, len);
+  return client->read(*(RadosClient::PoolCtx *)pool, oid, off, bl, len);
 }
 
 int Rados::exec(rados_pool_t pool, const object_t& oid, const char *cls, const char *method,
@@ -617,14 +665,21 @@ int Rados::exec(rados_pool_t pool, const object_t& oid, const char *cls, const c
   if (!client)
     return -EINVAL;
 
-  return client->exec(pool, oid, cls, method, inbl, outbl);
+  return client->exec(*(RadosClient::PoolCtx *)pool, oid, cls, method, inbl, outbl);
 }
 
 int Rados::open_pool(const char *name, rados_pool_t *pool)
 {
   int poolid = client->lookup_pool(name);
   if (poolid >= 0) {
-    *pool = poolid;
+    RadosClient::PoolCtx *ctx = new RadosClient::PoolCtx;
+    if (!ctx)
+      return -ENOMEM;
+
+    ctx->poolid = poolid;
+    ctx->snap_seq = CEPH_NOSNAP;
+
+    *pool = (rados_pool_t)ctx;
     return 0;
   }
   return poolid;
@@ -632,25 +687,29 @@ int Rados::open_pool(const char *name, rados_pool_t *pool)
 
 int Rados::close_pool(rados_pool_t pool)
 {
+  RadosClient::PoolCtx *ctx = (RadosClient::PoolCtx *)pool;
+  delete ctx;
   return 0;
 }
 
-int Rados::aio_read(int pool, const object_t& oid, off_t off, bufferlist *pbl, size_t len,
+int Rados::aio_read(rados_pool_t pool, const object_t& oid, off_t off, bufferlist *pbl, size_t len,
                    Rados::AioCompletion **pc)
 {
+  RadosClient::PoolCtx *ctx = (RadosClient::PoolCtx *)pool;
   RadosClient::AioCompletion *c;
-  int r = client->aio_read(pool, oid, off, pbl, len, &c);
+  int r = client->aio_read(*ctx, oid, off, pbl, len, &c);
   if (r >= 0) {
     *pc = new AioCompletion((void *)c);
   }
   return r;
 }
 
-int Rados::aio_write(int pool, const object_t& oid, off_t off, bufferlist& bl, size_t len,
+int Rados::aio_write(rados_pool_t pool, const object_t& oid, off_t off, bufferlist& bl, size_t len,
                     AioCompletion **pc)
 {
+  RadosClient::PoolCtx *ctx = (RadosClient::PoolCtx *)pool;
   RadosClient::AioCompletion *c;
-  int r = client->aio_write(pool, oid, off, bl, len, &c);
+  int r = client->aio_write(*ctx, oid, off, bl, len, &c);
   if (r >= 0) {
     *pc = new AioCompletion((void *)c);
   }
@@ -750,7 +809,12 @@ extern "C" int rados_open_pool(const char *name, rados_pool_t *pool)
 {
   int poolid = radosp->lookup_pool(name);
   if (poolid >= 0) {
-    *pool = poolid;
+    RadosClient::PoolCtx *ctx = new RadosClient::PoolCtx;
+    if (!ctx)
+      return -ENOMEM;
+    ctx->poolid = poolid;
+    ctx->snap_seq = CEPH_NOSNAP;
+    *pool = ctx;
     return 0;
   }
   return poolid;
@@ -758,29 +822,42 @@ extern "C" int rados_open_pool(const char *name, rados_pool_t *pool)
 
 extern "C" int rados_close_pool(rados_pool_t pool)
 {
+  RadosClient::PoolCtx *ctx = (RadosClient::PoolCtx *)pool;
+  delete ctx;
   return 0;
 }
 
+extern "C" void rados_set_snap(rados_pool_t pool, int seq)
+{
+  RadosClient::PoolCtx *ctx = (RadosClient::PoolCtx *)pool;
+
+  radosp->set_snap(*ctx, (snapid_t)seq);
+}
+
+
 extern "C" int rados_write(rados_pool_t pool, const char *o, off_t off, const char *buf, size_t len)
 {
+  RadosClient::PoolCtx *ctx = (RadosClient::PoolCtx *)pool;
   object_t oid(o);
   bufferlist bl;
   bl.append(buf, len);
-  return radosp->write(pool, oid, off, bl, len);
+  return radosp->write(*ctx, oid, off, bl, len);
 }
 
 extern "C" int rados_remove(rados_pool_t pool, const char *o)
 {
+  RadosClient::PoolCtx *ctx = (RadosClient::PoolCtx *)pool;
   object_t oid(o);
-  return radosp->remove(pool, oid);
+  return radosp->remove(*ctx, oid);
 }
 
 extern "C" int rados_read(rados_pool_t pool, const char *o, off_t off, char *buf, size_t len)
 {
+  RadosClient::PoolCtx *ctx = (RadosClient::PoolCtx *)pool;
   int ret;
   object_t oid(o);
   bufferlist bl;
-  ret = radosp->read(pool, oid, off, bl, len);
+  ret = radosp->read(*ctx, oid, off, bl, len);
   if (ret >= 0) {
     if (bl.length() > len)
       return -ERANGE;
@@ -794,11 +871,12 @@ extern "C" int rados_read(rados_pool_t pool, const char *o, off_t off, char *buf
 extern "C" int rados_exec(rados_pool_t pool, const char *o, const char *cls, const char *method,
                          const char *inbuf, size_t in_len, char *buf, size_t out_len)
 {
+  RadosClient::PoolCtx *ctx = (RadosClient::PoolCtx *)pool;
   object_t oid(o);
   bufferlist inbl, outbl;
   int ret;
   inbl.append(inbuf, in_len);
-  ret = radosp->exec(pool, oid, cls, method, inbl, outbl);
+  ret = radosp->exec(*ctx, oid, cls, method, inbl, outbl);
   if (ret >= 0) {
     if (outbl.length()) {
       if (outbl.length() > out_len)
@@ -823,23 +901,24 @@ extern "C" void rados_pool_close_ctx(rados_list_ctx_t *ctx)
   }
 }
 
-extern "C" int rados_pool_list_next(rados_pool_t pool, const char **entry, rados_list_ctx_t *ctx)
+extern "C" int rados_pool_list_next(rados_pool_t pool, const char **entry, rados_list_ctx_t *listctx)
 {
   int ret;
+  RadosClient::PoolCtx *ctx = (RadosClient::PoolCtx *)pool;
 
-  if (!*ctx) {
-    *ctx = new RadosClient::PGLSOp;
-    if (!*ctx)
+  if (!*listctx) {
+    *listctx = new RadosClient::PGLSOp;
+    if (!*listctx)
       return -ENOMEM;
   }
-  RadosClient::PGLSOp *op = (RadosClient::PGLSOp *)*ctx;
+  RadosClient::PGLSOp *op = (RadosClient::PGLSOp *)*listctx;
   if (op->pos == op->total) {
     op->list.clear();
 #define MAX_ENTRIES 1024
-    ret = radosp->list(pool, MAX_ENTRIES, op->list, *op);
+    ret = radosp->list(*ctx, MAX_ENTRIES, op->list, *op);
     if (!op->list.size()) {
       delete op;
-      *ctx = NULL;
+      *listctx = NULL;
       return -ENOENT;
     }
     op->pos = 0;
@@ -893,16 +972,18 @@ extern "C" int rados_aio_read(rados_pool_t pool, const char *o,
                               off_t off, char *buf, size_t len,
                               rados_completion_t *completion)
 {
+  RadosClient::PoolCtx *ctx = (RadosClient::PoolCtx *)pool;
   object_t oid(o);
-  return radosp->aio_read(pool, oid, off, buf, len, (RadosClient::AioCompletion**)completion);
+  return radosp->aio_read(*ctx, oid, off, buf, len, (RadosClient::AioCompletion**)completion);
 }
 
 extern "C" int rados_aio_write(rados_pool_t pool, const char *o,
                               off_t off, const char *buf, size_t len,
                               rados_completion_t *completion)
 {
+  RadosClient::PoolCtx *ctx = (RadosClient::PoolCtx *)pool;
   object_t oid(o);
   bufferlist bl;
   bl.append(buf, len);
-  return radosp->aio_write(pool, oid, off, bl, len, (RadosClient::AioCompletion**)completion);
+  return radosp->aio_write(*ctx, oid, off, bl, len, (RadosClient::AioCompletion**)completion);
 }
index 4ab23d21721217bb179cc840108a25db72fe3fe9..dffb42776fe8f206e513eb0a4ba682fe272fbc87 100644 (file)
@@ -1881,7 +1881,7 @@ bool FileStore::collection_empty(coll_t c)
   return empty;
 }
 
-int FileStore::collection_list_partial(coll_t c, vector<sobject_t>& ls, int max_count, collection_list_handle_t *handle) 
+int FileStore::collection_list_partial(coll_t c, snapid_t seq, vector<sobject_t>& ls, int max_count, collection_list_handle_t *handle)
 {  
   if (fake_collections) return collections.collection_list(c, ls);
 
@@ -1929,6 +1929,10 @@ int FileStore::collection_list_partial(coll_t c, vector<sobject_t>& ls, int max_
     //cout << "  got object " << de->d_name << std::endl;
     sobject_t o;
     if (parse_object(de->d_name, o)) {
+      if (o.snap != seq) {
+        i--;
+        continue;
+      }
       inolist.push_back(pair<ino_t,sobject_t>(de->d_ino, o));
       ls.push_back(o);
     }
index 6c07af37a966bdce459a53cf2243f24a6a1e2253..d7c918b66c3e23293582e73626c7a425de14c5f0 100644 (file)
@@ -147,7 +147,7 @@ class FileStore : public JournalingObjectStore {
   int collection_stat(coll_t c, struct stat *st);
   bool collection_exists(coll_t c);
   bool collection_empty(coll_t c);
-  int collection_list_partial(coll_t c, vector<sobject_t>& o, int count, collection_list_handle_t *handle);
+  int collection_list_partial(coll_t c, snapid_t seq, vector<sobject_t>& o, int count, collection_list_handle_t *handle);
   int collection_list(coll_t c, vector<sobject_t>& o);
 
   int _create_collection(coll_t c);
index 6122001cef5d268a9e4f249466aff5548f88b39a..85800f43134fd050e129f1509d9278ab23820e69 100644 (file)
@@ -464,7 +464,7 @@ public:
   virtual int collection_getattr(coll_t cid, const char *name, bufferlist& bl) = 0;
   virtual int collection_getattrs(coll_t cid, map<nstring,bufferptr> &aset) = 0;
   virtual bool collection_empty(coll_t c) = 0;
-  virtual int collection_list_partial(coll_t c, vector<sobject_t>& o, int count, collection_list_handle_t *handle) = 0;
+  virtual int collection_list_partial(coll_t c, snapid_t seq, vector<sobject_t>& o, int count, collection_list_handle_t *handle) = 0;
   virtual int collection_list(coll_t c, vector<sobject_t>& o) = 0;
 
   /*
index 873edc94dac42c2bb38b17c390745c349af6607f..1e53f5527876e37c7b913705d295a1a09dfd79af 100644 (file)
@@ -384,7 +384,7 @@ void ReplicatedPG::do_pg_op(MOSDOp *op)
         PGLSResponse response;
         response.handle = (collection_list_handle_t)(__u64)(p->pgls_cookie);
         vector<sobject_t> sentries;
-       result = osd->store->collection_list_partial(op->get_pg().to_coll(), sentries, p->length,
+       result = osd->store->collection_list_partial(op->get_pg().to_coll(), op->get_snapid(), sentries, p->length,
                                                     &response.handle);
        if (!result) {
           vector<sobject_t>::iterator iter;
index 0124e1027afc251798b651029f4ac93c7fdf1589..a95c54fff10178231128961ed21b63ce03eabe05 100644 (file)
@@ -21,6 +21,7 @@
 
 void buf_to_hex(const unsigned char *buf, int len, char *str)
 {
+  str[0] = '\0';
   for (int i = 0; i < len; i++) {
     sprintf(&str[i*2], "%02x", (int)buf[i]);
   }
@@ -49,7 +50,10 @@ int main(int argc, const char **argv)
   cout << "open pool result = " << r << " pool = " << pool << std::endl;
 
   rados.write(pool, oid, 0, bl, bl.length());
-
+  rados.write(pool, oid, 0, bl, bl.length() - 1);
+  rados.write(pool, oid, 0, bl, bl.length() - 2);
+  rados.write(pool, oid, 0, bl, bl.length() - 3);
+  rados.write(pool, oid, 0, bl, bl.length() - 4);
   r = rados.exec(pool, oid, "crypto", "md5", bl, bl2);
   cout << "exec returned " << r << std::endl;
   const unsigned char *md5 = (const unsigned char *)bl2.c_str();