]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
test: add test_filestore_idempotent2
authorSamuel Just <rexludorum@gmail.com>
Wed, 7 Mar 2012 19:29:52 +0000 (11:29 -0800)
committerSamuel Just <samuel.just@dreamhost.com>
Mon, 12 Mar 2012 21:58:26 +0000 (14:58 -0700)
Signed-off-by: Samuel Just <rexludorum@gmail.com>
src/Makefile.am
src/test/common/ObjectContents.cc [new file with mode: 0644]
src/test/common/ObjectContents.h [new file with mode: 0644]
src/test/filestore_test/FileStoreTracker.cc [new file with mode: 0644]
src/test/filestore_test/FileStoreTracker.h [new file with mode: 0644]
src/test/filestore_test/test_idempotent.cc [new file with mode: 0644]

index 6e728e66264b262f121a12b34ad88d97a65694ca..a6e6a957d68adc1dca23f35de8871d21c61b4534 100644 (file)
@@ -189,7 +189,10 @@ streamtest_SOURCES = streamtest.cc
 streamtest_LDADD = libos.la leveldb/libleveldb.a $(LIBGLOBAL_LDA)
 test_filestore_idempotent_SOURCES = test/test_filestore_idempotent.cc
 test_filestore_idempotent_LDADD = libos.la leveldb/libleveldb.a $(LIBGLOBAL_LDA)
-bin_DEBUGPROGRAMS += dupstore streamtest test_filestore_idempotent
+test_filestore_idempotent2_SOURCES = test/filestore_test/test_idempotent.cc test/filestore_test/FileStoreTracker.cc test/common/ObjectContents.cc
+test_filestore_idempotent2_LDADD = libos.la leveldb/libleveldb.a $(LIBGLOBAL_LDA)
+test_filestore_idempotent2_CXXFLAGS = -I$(top_srcdir)/src/leveldb/include
+bin_DEBUGPROGRAMS += dupstore streamtest test_filestore_idempotent test_filestore_idempotent2
 
 test_trans_SOURCES = test_trans.cc
 test_trans_LDADD = libos.la leveldb/libleveldb.a $(LIBGLOBAL_LDA)
diff --git a/src/test/common/ObjectContents.cc b/src/test/common/ObjectContents.cc
new file mode 100644 (file)
index 0000000..3b7e1f3
--- /dev/null
@@ -0,0 +1,128 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+#include "ObjectContents.h"
+#include "include/buffer.h"
+#include <iostream>
+#include <map>
+
+bool test_object_contents()
+{
+  ObjectContents c, d;
+  assert(!c.exists());
+  c.debug(std::cerr);
+  c.write(10, 10, 10);
+  assert(c.exists());
+  assert(c.size() == 20);
+
+  c.debug(std::cerr);
+  bufferlist bl;
+  for (ObjectContents::Iterator iter = c.get_iterator();
+       iter.valid();
+       ++iter) {
+    bl.append(*iter);
+  }
+  assert(bl.length() == 20);
+
+  bufferlist bl2;
+  for (unsigned i = 0; i < 8; ++i) bl2.append(bl[i]);
+  c.write(10, 8, 4);
+  c.debug(std::cerr);
+  ObjectContents::Iterator iter = c.get_iterator();
+  iter.seek_to(8);
+  for (uint64_t i = 8;
+       i < 12;
+       ++i, ++iter) {
+    bl2.append(*iter);
+  }
+  for (unsigned i = 12; i < 20; ++i) bl2.append(bl[i]);
+  assert(bl2.length() == 20);
+
+  for (ObjectContents::Iterator iter3 = c.get_iterator();
+       iter.valid();
+       ++iter) {
+    assert(bl2[iter3.get_pos()] == *iter3);
+  }
+
+  assert(bl2[0] == '\0');
+  assert(bl2[7] == '\0');
+
+  interval_set<uint64_t> to_clone;
+  to_clone.insert(5, 10);
+  d.clone_range(c, to_clone);
+  assert(d.size() == 15);
+
+  c.debug(std::cerr);
+  d.debug(std::cerr);
+
+  ObjectContents::Iterator iter2 = d.get_iterator();
+  iter2.seek_to(5);
+  for (uint64_t i = 5; i < 15; ++i, ++iter2) {
+    std::cerr << "i is " << i << std::endl;
+    assert(iter2.get_pos() == i);
+    assert(*iter2 == bl2[i]);
+  }
+  return true;
+}
+
+
+unsigned int ObjectContents::Iterator::get_state(uint64_t _pos)
+{
+  if (parent->seeds.count(_pos)) {
+    return parent->seeds[_pos];
+  }
+  seek_to(_pos - 1);
+  return current_state;
+}
+
+void ObjectContents::clone_range(ObjectContents &other,
+                                interval_set<uint64_t> &intervals)
+{
+  interval_set<uint64_t> written_to_clone;
+  written_to_clone.intersection_of(intervals, other.written);
+
+  interval_set<uint64_t> zeroed = intervals;
+  zeroed.subtract(written_to_clone);
+
+  written.union_of(intervals);
+  written.subtract(zeroed);
+
+  for (interval_set<uint64_t>::iterator i = written_to_clone.begin();
+       i != written_to_clone.end();
+       ++i) {
+    uint64_t start = i.get_start();
+    uint64_t len = i.get_len();
+
+    unsigned int seed = get_iterator().get_state(start+len);
+
+    seeds[start+len] = seed;
+    seeds.erase(seeds.lower_bound(start), seeds.lower_bound(start+len));
+
+    seeds[start] = other.get_iterator().get_state(start);
+    seeds.insert(other.seeds.upper_bound(start),
+                other.seeds.lower_bound(start+len));
+  }
+
+  if (intervals.range_end() > _size)
+    _size = intervals.range_end();
+  _exists = true;
+  return;
+}
+
+void ObjectContents::write(unsigned int seed,
+                          uint64_t start,
+                          uint64_t len)
+{
+  _exists = true;
+  unsigned int _seed = get_iterator().get_state(start+len);
+  seeds[start+len] = _seed;
+  seeds.erase(seeds.lower_bound(start),
+             seeds.lower_bound(start+len));
+  seeds[start] = seed;
+
+  interval_set<uint64_t> to_write;
+  to_write.insert(start, len);
+  written.union_of(to_write);
+
+  if (start + len > _size)
+    _size = start + len;
+  return;
+}
diff --git a/src/test/common/ObjectContents.h b/src/test/common/ObjectContents.h
new file mode 100644 (file)
index 0000000..f7b97a6
--- /dev/null
@@ -0,0 +1,121 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+#include "include/interval_set.h"
+#include "include/buffer.h"
+#include <map>
+
+#ifndef COMMON_OBJECT_H
+#define COMMON_OBJECT_H
+
+enum {
+  RANDOMWRITEFULL,
+  DELETED,
+  CLONERANGE
+};
+
+bool test_object_contents();
+
+class ObjectContents {
+  uint64_t _size;
+  map<uint64_t, unsigned int> seeds;
+  interval_set<uint64_t> written;
+  bool _exists;
+public:
+  class Iterator {
+    ObjectContents *parent;
+    map<uint64_t, unsigned int>::iterator iter;
+    unsigned int current_state;
+    int current_val;
+    uint64_t pos;
+  private:
+    unsigned int get_state(uint64_t pos);
+  public:
+    Iterator(ObjectContents *parent) :
+      parent(parent), iter(parent->seeds.end()),
+      current_state(0), current_val(0), pos(-1) {
+      seek_to_first();
+    }
+    char operator*() {
+      return parent->written.contains(pos) ?
+       static_cast<char>(current_val % 256) : '\0';
+    }
+    uint64_t get_pos() {
+      return pos;
+    }
+    void seek_to(uint64_t _pos) {
+      if (pos > _pos ||
+         iter != parent->seeds.end() && _pos >= iter->first) {
+       iter = parent->seeds.upper_bound(_pos);
+       --iter;
+       current_state = iter->second;
+       current_val = rand_r(&current_state);
+       pos = iter->first;
+       ++iter;
+      }
+      while (pos < _pos) ++(*this);
+    }
+
+    void seek_to_first() {
+      seek_to(0);
+    }
+    Iterator &operator++() {
+      ++pos;
+      if (iter != parent->seeds.end() && pos >= iter->first) {
+       assert(pos == iter->first);
+       current_state = iter->second;
+       ++iter;
+      }
+      current_val = rand_r(&current_state);
+      return *this;
+    }
+    bool valid() {
+      return pos < parent->size();
+    }
+    friend class ObjectContents;
+  };
+
+  ObjectContents() : _size(0), _exists(false) {
+    seeds[0] = 0;
+  }
+
+  ObjectContents(bufferlist::iterator &bp) {
+    ::decode(_size, bp);
+    ::decode(seeds, bp);
+    ::decode(written, bp);
+    ::decode(_exists, bp);
+  }
+
+  void clone_range(ObjectContents &other,
+                  interval_set<uint64_t> &intervals);
+  void write(unsigned int seed,
+            uint64_t from,
+            uint64_t len);
+  Iterator get_iterator() {
+    return Iterator(this);
+  }
+
+  uint64_t size() const { return _size; }
+
+  bool exists() { return _exists; }
+
+  void debug(std::ostream &out) {
+    out << "_size is " << _size << std::endl;
+    out << "seeds is: (";
+    for (map<uint64_t, unsigned int>::iterator i = seeds.begin();
+        i != seeds.end();
+        ++i) {
+      out << "[" << i->first << "," << i->second << "], ";
+    }
+    out << ")" << std::endl;
+    out << "written is " << written << std::endl;
+    out << "_exists is " << _exists << std::endl;
+  }
+
+  void encode(bufferlist &bl) const {
+    ::encode(_size, bl);
+    ::encode(seeds, bl);
+    ::encode(written, bl);
+    ::encode(_exists, bl);
+  }
+};
+
+#endif
diff --git a/src/test/filestore_test/FileStoreTracker.cc b/src/test/filestore_test/FileStoreTracker.cc
new file mode 100644 (file)
index 0000000..2777a96
--- /dev/null
@@ -0,0 +1,452 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+#include "FileStoreTracker.h"
+#include <stdlib.h>
+#include <iostream>
+#include <boost/scoped_ptr.hpp>
+#include "include/Context.h"
+#include "common/Mutex.h"
+
+class OnApplied : public Context {
+  FileStoreTracker *tracker;
+  list<pair<pair<string, string>, uint64_t> > in_flight;
+  ObjectStore::Transaction *t;
+public:
+  OnApplied(FileStoreTracker *tracker,
+           list<pair<pair<string, string>, uint64_t> > in_flight,
+           ObjectStore::Transaction *t)
+    : tracker(tracker), in_flight(in_flight), t(t) {}
+
+  void finish(int r) {
+    for (list<pair<pair<string, string>, uint64_t> >::iterator i =
+          in_flight.begin();
+        i != in_flight.end();
+        ++i) {
+      tracker->applied(i->first, i->second);
+    }
+    delete t;
+  }
+};
+
+class OnCommitted : public Context {
+  FileStoreTracker *tracker;
+  list<pair<pair<string, string>, uint64_t> > in_flight;
+public:
+  OnCommitted(FileStoreTracker *tracker,
+             list<pair<pair<string, string>, uint64_t> > in_flight)
+    : tracker(tracker), in_flight(in_flight) {}
+
+  void finish(int r) {
+    for (list<pair<pair<string, string>, uint64_t> >::iterator i =
+          in_flight.begin();
+        i != in_flight.end();
+        ++i) {
+      tracker->committed(i->first, i->second);
+    }
+  }
+};
+
+int FileStoreTracker::init()
+{
+  set<string> to_get;
+  to_get.insert("STATUS");
+  map<string, bufferlist> got;
+  db->get("STATUS", to_get, &got);
+  restart_seq = 0;
+  if (got.size()) {
+    bufferlist::iterator bp = got.begin()->second.begin();
+    ::decode(restart_seq, bp);
+  }
+  ++restart_seq;
+  KeyValueDB::Transaction t = db->get_transaction();
+  got.clear();
+  ::encode(restart_seq, got["STATUS"]);
+  t->set("STATUS", got);
+  db->submit_transaction(t);
+  return 0;
+}
+
+void FileStoreTracker::submit_transaction(Transaction &t)
+{
+  list<pair<pair<string, string>, uint64_t> > in_flight;
+  OutTransaction out;
+  out.t = new ObjectStore::Transaction;
+  out.in_flight = &in_flight;
+  for (list<Transaction::Op*>::iterator i = t.ops.begin();
+       i != t.ops.end();
+       ++i) {
+    (**i)(this, &out);
+  }
+  store->queue_transaction(
+    0, out.t,
+    new OnApplied(this, in_flight, out.t),
+    new OnCommitted(this, in_flight));
+}
+
+void FileStoreTracker::write(const pair<string, string> &obj,
+                            OutTransaction *out)
+{
+  Mutex::Locker l(lock);
+  std::cerr << "Writing " << obj << std::endl;
+  ObjectContents contents = get_current_content(obj);
+
+  uint64_t offset = rand() % (SIZE/2);
+  uint64_t len = rand() % (SIZE/2);
+  if (!len) len = 10;
+  contents.write(rand(), offset, len);
+
+  bufferlist to_write;
+  ObjectContents::Iterator iter = contents.get_iterator();
+  iter.seek_to(offset);
+  for (uint64_t i = offset;
+       i < offset + len;
+       ++i, ++iter) {
+    assert(iter.valid());
+    to_write.append(*iter);
+  }
+  out->t->write(coll_t(obj.first),
+               hobject_t(sobject_t(obj.second, CEPH_NOSNAP)),
+               offset,
+               len,
+               to_write);
+  out->in_flight->push_back(make_pair(obj, set_content(obj, contents)));
+}
+
+void FileStoreTracker::remove(const pair<string, string> &obj,
+                             OutTransaction *out)
+{
+  std::cerr << "Deleting " << obj << std::endl;
+  Mutex::Locker l(lock);
+  ObjectContents old_contents = get_current_content(obj);
+  if (!old_contents.exists())
+    return;
+  out->t->remove(coll_t(obj.first),
+                hobject_t(sobject_t(obj.second, CEPH_NOSNAP)));
+  ObjectContents contents;
+  out->in_flight->push_back(make_pair(obj, set_content(obj, contents)));
+}
+
+void FileStoreTracker::clone_range(const pair<string, string> &from,
+                                  const pair<string, string> &to,
+                                  OutTransaction *out) {
+  Mutex::Locker l(lock);
+  std::cerr << "CloningRange " << from << " to " << to << std::endl;
+  assert(from.first == to.first);
+  ObjectContents from_contents = get_current_content(from);
+  ObjectContents to_contents = get_current_content(to);
+  if (!from_contents.exists()) {
+    return;
+  }
+  if (from.second == to.second) {
+    return;
+  }
+
+  uint64_t new_size = from_contents.size();
+  interval_set<uint64_t> interval_to_clone;
+  uint64_t offset = rand() % (new_size/2);
+  uint64_t len = rand() % (new_size/2);
+  if (!len) len = 10;
+  interval_to_clone.insert(offset, len);
+  to_contents.clone_range(from_contents, interval_to_clone);
+  out->t->clone_range(coll_t(from.first),
+                     hobject_t(sobject_t(from.second, CEPH_NOSNAP)),
+                     hobject_t(sobject_t(to.second, CEPH_NOSNAP)),
+                     offset,
+                     len,
+                     offset);
+  out->in_flight->push_back(make_pair(to, set_content(to, to_contents)));
+}
+
+void FileStoreTracker::clone(const pair<string, string> &from,
+                            const pair<string, string> &to,
+                            OutTransaction *out) {
+  Mutex::Locker l(lock);
+  std::cerr << "Cloning " << from << " to " << to << std::endl;
+  assert(from.first == to.first);
+  if (from.second == to.second) {
+    return;
+  }
+  ObjectContents from_contents = get_current_content(from);
+  ObjectContents to_contents = get_current_content(to);
+  if (!from_contents.exists()) {
+    return;
+  }
+
+  if (to_contents.exists())
+    out->t->remove(coll_t(to.first),
+                  hobject_t(sobject_t(to.second, CEPH_NOSNAP)));
+  out->t->clone(coll_t(from.first),
+               hobject_t(sobject_t(from.second, CEPH_NOSNAP)),
+               hobject_t(sobject_t(to.second, CEPH_NOSNAP)));
+  out->in_flight->push_back(make_pair(to, set_content(to, from_contents)));
+}
+
+
+string obj_to_prefix(const pair<string, string> &obj) {
+  string sep;
+  sep.push_back('^');
+  return obj.first + sep + obj.second + "_CONTENTS_";
+}
+
+string obj_to_meta_prefix(const pair<string, string> &obj) {
+  string sep;
+  sep.push_back('^');
+  return obj.first + sep + obj.second;
+}
+
+string seq_to_key(uint64_t seq) {
+  char buf[50];
+  snprintf(buf, sizeof(buf), "%*llu", 20, (unsigned long long int)seq);
+  return string(buf);
+}
+
+struct ObjStatus {
+  uint64_t last_applied;
+  uint64_t last_committed;
+  uint64_t restart_seq;
+  ObjStatus() : last_applied(0), last_committed(0), restart_seq(0) {}
+
+  uint64_t get_last_applied(uint64_t seq) const {
+    if (seq > restart_seq)
+      return last_committed;
+    else
+      return last_applied;
+  }
+  void set_last_applied(uint64_t _last_applied, uint64_t seq) {
+    last_applied = _last_applied;
+    restart_seq = seq;
+  }
+  uint64_t trim_to() const {
+    return last_applied < last_committed ?
+      last_applied : last_committed;
+  }
+};
+void encode(const ObjStatus &obj, bufferlist &bl) {
+  ::encode(obj.last_applied, bl);
+  ::encode(obj.last_committed, bl);
+  ::encode(obj.restart_seq, bl);
+}
+void decode(ObjStatus &obj, bufferlist::iterator &bl) {
+  ::decode(obj.last_applied, bl);
+  ::decode(obj.last_committed, bl);
+  ::decode(obj.restart_seq, bl);
+}
+
+
+ObjStatus get_obj_status(const pair<string, string> &obj,
+                        KeyValueDB *db)
+{
+  set<string> to_get;
+  to_get.insert("META");
+  map<string, bufferlist> got;
+  db->get(obj_to_meta_prefix(obj), to_get, &got);
+  ObjStatus retval;
+  if (got.size()) {
+    bufferlist::iterator bp = got.begin()->second.begin();
+    ::decode(retval, bp);
+  }
+  return retval;
+}
+
+void set_obj_status(const pair<string, string> &obj,
+                   const ObjStatus &status,
+                   KeyValueDB::Transaction t)
+{
+  map<string, bufferlist> to_set;
+  ::encode(status, to_set["META"]);
+  t->set(obj_to_meta_prefix(obj), to_set);
+}
+
+void _clean_forward(const pair<string, string> &obj,
+                   uint64_t last_valid,
+                   KeyValueDB *db)
+{
+  KeyValueDB::Transaction t = db->get_transaction();
+  KeyValueDB::Iterator i = db->get_iterator(obj_to_prefix(obj));
+  set<string> to_remove;
+  i->upper_bound(seq_to_key(last_valid));
+  for (; i->valid(); i->next()) {
+    to_remove.insert(i->key());
+  }
+  t->rmkeys(obj_to_prefix(obj), to_remove);
+  db->submit_transaction(t);
+}
+
+
+void FileStoreTracker::verify(const string &coll, const string &obj,
+                             bool on_start) {
+  Mutex::Locker l(lock);
+  std::cerr << "Verifying " << make_pair(coll, obj) << std::endl;
+
+  pair<uint64_t, uint64_t> valid_reads = get_valid_reads(make_pair(coll, obj));
+  std::cerr << "valid_reads is " << valid_reads << std::endl;
+  bufferlist contents;
+  int r = store->read(coll_t(coll),
+                     hobject_t(sobject_t(obj, CEPH_NOSNAP)),
+                     0,
+                     2*SIZE,
+                     contents);
+  std::cerr << "exists: " << r << std::endl;
+
+
+  for (uint64_t i = valid_reads.first;
+       i < valid_reads.second;
+       ++i) {
+    ObjectContents old_contents = get_content(make_pair(coll, obj), i);
+
+    std::cerr << "old_contents exists " << old_contents.exists() << std::endl;
+    if (!old_contents.exists() && (r == -ENOENT))
+      return;
+
+    if (old_contents.exists() && (r == -ENOENT))
+      continue;
+
+    if (!old_contents.exists() && (r != -ENOENT))
+      continue;
+
+    if (contents.length() != old_contents.size()) {
+      std::cerr << "old_contents.size() is "
+               << old_contents.size() << std::endl;
+      continue;
+    }
+
+    bufferlist::iterator bp = contents.begin();
+    ObjectContents::Iterator iter = old_contents.get_iterator();
+    iter.seek_to_first();
+    bool matches = true;
+    uint64_t pos = 0;
+    for (; !bp.end() && iter.valid();
+        ++iter, ++bp, ++pos) {
+      if (*iter != *bp) {
+       std::cerr << "does not match at pos " << pos << std::endl;
+       matches = false;
+       break;
+      }
+    }
+    if (matches) {
+      if (on_start)
+       _clean_forward(make_pair(coll, obj), i, db);
+      return;
+    }
+  }
+  std::cerr << "Verifying " << make_pair(coll, obj) << " failed " << std::endl;
+  assert(0);
+}
+
+ObjectContents FileStoreTracker::get_current_content(
+  const pair<string, string> &obj)
+{
+  KeyValueDB::Iterator iter = db->get_iterator(
+    obj_to_prefix(obj));
+  iter->seek_to_last();
+  if (iter->valid()) {
+    bufferlist bl = iter->value();
+    bufferlist::iterator bp = bl.begin();
+    pair<uint64_t, bufferlist> val;
+    ::decode(val, bp);
+    assert(seq_to_key(val.first) == iter->key());
+    bp = val.second.begin();
+    return ObjectContents(bp);
+  }
+  return ObjectContents();
+}
+
+ObjectContents FileStoreTracker::get_content(
+  const pair<string, string> &obj, uint64_t version)
+{
+  set<string> to_get;
+  map<string, bufferlist> got;
+  to_get.insert(seq_to_key(version));
+  db->get(obj_to_prefix(obj), to_get, &got);
+  if (!got.size())
+    return ObjectContents();
+  pair<uint64_t, bufferlist> val;
+  bufferlist::iterator bp = got.begin()->second.begin();
+  ::decode(val, bp);
+  bp = val.second.begin();
+  assert(val.first == version);
+  return ObjectContents(bp);
+}
+
+pair<uint64_t, uint64_t> FileStoreTracker::get_valid_reads(
+  const pair<string, string> &obj)
+{
+  pair<uint64_t, uint64_t> bounds = make_pair(0,1);
+  KeyValueDB::Iterator iter = db->get_iterator(
+    obj_to_prefix(obj));
+  iter->seek_to_last();
+  if (iter->valid()) {
+    pair<uint64_t, bufferlist> val;
+    bufferlist bl = iter->value();
+    bufferlist::iterator bp = bl.begin();
+    ::decode(val, bp);
+    bounds.second = val.first + 1;
+  }
+
+  ObjStatus obj_status = get_obj_status(obj, db);
+  bounds.first = obj_status.get_last_applied(restart_seq);
+  return bounds;
+}
+
+void clear_obsolete(const pair<string, string> &obj,
+                   const ObjStatus &status,
+                   KeyValueDB *db,
+                   KeyValueDB::Transaction t)
+{
+  KeyValueDB::Iterator iter = db->get_iterator(obj_to_prefix(obj));
+  set<string> to_remove;
+  iter->seek_to_first();
+  for (; iter->valid() && iter->key() < seq_to_key(status.trim_to());
+       iter->next())
+    to_remove.insert(iter->key());
+  t->rmkeys(obj_to_prefix(obj), to_remove);
+}
+
+void FileStoreTracker::committed(const pair<string, string> &obj,
+                                uint64_t seq) {
+  Mutex::Locker l(lock);
+  ObjStatus status = get_obj_status(obj, db);
+  assert(status.last_committed < seq);
+  status.last_committed = seq;
+  KeyValueDB::Transaction t = db->get_transaction();
+  clear_obsolete(obj, status, db, t);
+  set_obj_status(obj, status, t);
+  db->submit_transaction(t);
+}
+
+void FileStoreTracker::applied(const pair<string, string> &obj,
+                              uint64_t seq) {
+  Mutex::Locker l(lock);
+  std::cerr << "Applied " << obj << " version " << seq << std::endl;
+  ObjStatus status = get_obj_status(obj, db);
+  assert(status.last_applied < seq);
+  status.set_last_applied(seq, restart_seq);
+  KeyValueDB::Transaction t = db->get_transaction();
+  clear_obsolete(obj, status, db, t);
+  set_obj_status(obj, status, t);
+  db->submit_transaction(t);
+}
+
+
+uint64_t FileStoreTracker::set_content(const pair<string, string> &obj,
+                                      ObjectContents &content) {
+  KeyValueDB::Transaction t = db->get_transaction();
+  KeyValueDB::Iterator iter = db->get_iterator(
+    obj_to_prefix(obj));
+  iter->seek_to_last();
+  uint64_t most_recent = 0;
+  if (iter->valid()) {
+    pair<uint64_t, bufferlist> val;
+    bufferlist bl = iter->value();
+    bufferlist::iterator bp = bl.begin();
+    ::decode(val, bp);
+    most_recent = val.first;
+  }
+  bufferlist buf_content;
+  content.encode(buf_content);
+  map<string, bufferlist> to_set;
+  ::encode(make_pair(most_recent + 1, buf_content),
+          to_set[seq_to_key(most_recent + 1)]);
+  t->set(obj_to_prefix(obj), to_set);
+  db->submit_transaction(t);
+  return most_recent + 1;
+}
diff --git a/src/test/filestore_test/FileStoreTracker.h b/src/test/filestore_test/FileStoreTracker.h
new file mode 100644 (file)
index 0000000..d70e54a
--- /dev/null
@@ -0,0 +1,138 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+
+#ifndef FILESTORE_TRACKER_H
+#define FILESTORE_TRACKER_H
+#include "test/common/ObjectContents.h"
+#include "os/FileStore.h"
+#include "os/KeyValueDB.h"
+#include <boost/scoped_ptr.hpp>
+#include <list>
+#include <map>
+#include "common/Mutex.h"
+
+class FileStoreTracker {
+  const static uint64_t SIZE = 4 * 1024;
+  ObjectStore *store;
+  KeyValueDB *db;
+  Mutex lock;
+  uint64_t restart_seq;
+
+  struct OutTransaction {
+    list<pair<pair<string, string>, uint64_t> > *in_flight;
+    ObjectStore::Transaction *t;
+  };
+public:
+  FileStoreTracker(ObjectStore *store, KeyValueDB *db)
+    : store(store), db(db),
+      lock("Tracker Lock"), restart_seq(0) {}
+
+  class Transaction {
+    class Op {
+    public:
+      virtual void operator()(FileStoreTracker *harness,
+                             OutTransaction *out) = 0;
+      virtual ~Op() {};
+    };
+    list<Op*> ops;
+    class Write : public Op {
+    public:
+      string coll;
+      string oid;
+      Write(const string &coll,
+           const string &oid)
+       : coll(coll), oid(oid) {}
+      void operator()(FileStoreTracker *harness,
+                     OutTransaction *out) {
+       harness->write(make_pair(coll, oid), out);
+      }
+    };
+    class CloneRange : public Op {
+    public:
+      string coll;
+      string from;
+      string to;
+      CloneRange(const string &coll,
+                const string &from,
+                const string &to)
+       : coll(coll), from(from), to(to) {}
+      void operator()(FileStoreTracker *harness,
+                     OutTransaction *out) {
+       harness->clone_range(make_pair(coll, from), make_pair(coll, to),
+                            out);
+      }
+    };
+    class Clone : public Op {
+    public:
+      string coll;
+      string from;
+      string to;
+      Clone(const string &coll,
+                const string &from,
+                const string &to)
+       : coll(coll), from(from), to(to) {}
+      void operator()(FileStoreTracker *harness,
+                     OutTransaction *out) {
+       harness->clone(make_pair(coll, from), make_pair(coll, to),
+                            out);
+      }
+    };
+    class Remove: public Op {
+    public:
+      string coll;
+      string obj;
+      Remove(const string &coll,
+            const string &obj)
+       : coll(coll), obj(obj) {}
+      void operator()(FileStoreTracker *harness,
+                     OutTransaction *out) {
+       harness->remove(make_pair(coll, obj),
+                       out);
+      }
+    };
+  public:
+    void write(const string &coll, const string &oid) {
+      ops.push_back(new Write(coll, oid));
+    }
+    void clone_range(const string &coll, const string &from,
+                    const string &to) {
+      ops.push_back(new CloneRange(coll, from, to));
+    }
+    void clone(const string &coll, const string &from,
+              const string &to) {
+      ops.push_back(new Clone(coll, from, to));
+    }
+    void remove(const string &coll, const string &oid) {
+      ops.push_back(new Remove(coll, oid));
+    }
+    friend class FileStoreTracker;
+  };
+
+  int init();
+  void submit_transaction(Transaction &t);
+  void verify(const string &coll,
+             const string &from,
+             bool on_start = false);
+
+private:
+  ObjectContents get_current_content(const pair<string, string> &obj);
+  pair<uint64_t, uint64_t> get_valid_reads(const pair<string, string> &obj);
+  ObjectContents get_content(const pair<string, string> &obj, uint64_t version);
+
+  void committed(const pair<string, string> &obj, uint64_t seq);
+  void applied(const pair<string, string> &obj, uint64_t seq);
+  uint64_t set_content(const pair<string, string> &obj, ObjectContents &content);
+
+  // ObjectContents Operations
+  void write(const pair<string, string> &obj, OutTransaction *out);
+  void remove(const pair<string, string> &obj, OutTransaction *out);
+  void clone_range(const pair<string, string> &from,
+                  const pair<string, string> &to,
+                  OutTransaction *out);
+  void clone(const pair<string, string> &from,
+            const pair<string, string> &to,
+            OutTransaction *out);
+  friend class OnApplied;
+  friend class OnCommitted;
+};
+
+#endif
diff --git a/src/test/filestore_test/test_idempotent.cc b/src/test/filestore_test/test_idempotent.cc
new file mode 100644 (file)
index 0000000..b8dd829
--- /dev/null
@@ -0,0 +1,114 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include <iostream>
+#include <sstream>
+#include <boost/scoped_ptr.hpp>
+#include "os/FileStore.h"
+#include "global/global_init.h"
+#include "common/ceph_argparse.h"
+#include "common/debug.h"
+#include "test/common/ObjectContents.h"
+#include "FileStoreTracker.h"
+#include "os/LevelDBStore.h"
+#include "os/KeyValueDB.h"
+#include "os/ObjectStore.h"
+#include "os/FileStore.h"
+
+void usage(const string &name) {
+  std::cerr << "Usage: " << name << " [new|continue] store_path store_journal db_path"
+           << std::endl;
+}
+
+template <typename T>
+typename T::iterator rand_choose(T &cont) {
+  if (cont.size() == 0) {
+    return cont.end();
+  }
+  int index = rand() % cont.size();
+  typename T::iterator retval = cont.begin();
+
+  for (; index > 0; --index) retval++;
+  return retval;
+}
+
+int main(int argc, char **argv) {
+  vector<const char*> args;
+  argv_to_vec(argc, (const char **)argv, args);
+
+  global_init(args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0);
+  common_init_finish(g_ceph_context);
+  g_ceph_context->_conf->apply_changes(NULL);
+
+  std::cerr << "args: " << args << std::endl;
+  if (args.size() < 4) {
+    usage(argv[0]);
+    return 1;
+  }
+
+  string store_path(args[1]);
+  string store_dev(args[2]);
+  string db_path(args[3]);
+
+  bool start_new = false;
+  if (string(args[0]) == string("new")) start_new = true;
+
+  LevelDBStore *_db = new LevelDBStore(db_path);
+  assert(!_db->init(std::cerr));
+  boost::scoped_ptr<KeyValueDB> db(_db);
+  boost::scoped_ptr<ObjectStore> store(new FileStore(store_path, store_dev));
+
+
+  if (start_new) {
+    std::cerr << "mkfs" << std::endl;
+    assert(!store->mkfs());
+    ObjectStore::Transaction t;
+    assert(!store->mount());
+    t.create_collection(coll_t("coll"));
+    store->apply_transaction(t);
+  } else {
+    assert(!store->mount());
+  }
+
+  FileStoreTracker tracker(store.get(), db.get());
+
+  set<string> objects;
+  for (unsigned i = 0; i < 10; ++i) {
+    stringstream stream;
+    stream << "Object_" << i;
+    tracker.verify("coll", stream.str(), true);
+    objects.insert(stream.str());
+  }
+
+  while (1) {
+    FileStoreTracker::Transaction t;
+    for (unsigned j = 0; j < 100; ++j) {
+      int val = rand() % 100;
+      if (val < 30) {
+       t.write("coll", *rand_choose(objects));
+      } else if (val < 60) {
+       t.clone("coll", *rand_choose(objects),
+               *rand_choose(objects));
+      } else if (val < 70) {
+       t.remove("coll", *rand_choose(objects));
+      } else {
+       t.clone_range("coll", *rand_choose(objects),
+                     *rand_choose(objects));
+      }
+    }
+    tracker.submit_transaction(t);
+    tracker.verify("coll", *rand_choose(objects));
+  }
+  return 0;
+}