// get epoch
sprintf(fn, "%s/commit_epoch", basedir.c_str());
fd = ::open(fn, O_RDONLY);
- ::read(fd, &super_epoch, sizeof(super_epoch));
+ ::read(fd, &op_seq, sizeof(op_seq));
::close(fd);
- dout(5) << "mount epoch is " << super_epoch << dendl;
+ dout(5) << "mount op_seq is " << op_seq << dendl;
// journal
sprintf(fn, "%s.journal", basedir.c_str());
// no btrfs transaction support?
// or, use trans start/end ioctls?
- if (!btrfs || btrfs_trans_start_end)
- return ObjectStore::apply_transaction(t, onsafe);
+ if (!btrfs || btrfs_trans_start_end) {
+ bufferlist tbl;
+ t.encode(tbl); // apply_transaction modifies t; encode first
+ op_start();
+ int r = ObjectStore::apply_transaction(t);
+ if (r >= 0)
+ journal_transaction(tbl, onsafe);
+ else
+ delete onsafe;
+ op_finish();
+ return r;
+ }
// create transaction
int len = t.get_btrfs_len();
dout(20) << "remove " << cid << " " << oid << dendl;
char fn[200];
get_coname(cid, oid, fn);
+ op_start();
int r = ::unlink(fn);
if (r == 0)
journal_remove(cid, oid, onsafe);
else
delete onsafe;
+ op_finish();
return r < 0 ? -errno:r;
}
char fn[200];
get_coname(cid, oid, fn);
+ op_start();
int r = ::truncate(fn, size);
if (r >= 0) journal_truncate(cid, oid, size, onsafe);
+ op_finish();
return r < 0 ? -errno:r;
}
dout(10) << "read couldn't open " << fn << " errno " << errno << " " << strerror(errno) << dendl;
return -errno;
}
- ::flock(fd, LOCK_EX); // lock for safety
__u64 actual = lseek(fd, offset, SEEK_SET);
size_t got = 0;
bptr.set_length(got); // properly size the buffer
if (got > 0) bl.push_back( bptr ); // put it in the target bufferlist
}
- ::flock(fd, LOCK_UN);
::close(fd);
return got;
}
derr(0) << "write couldn't open " << fn << " flags " << flags << " errno " << errno << " " << strerror(errno) << dendl;
return -errno;
}
- ::flock(fd, LOCK_EX); // lock for safety
+ op_start();
+
// seek
__u64 actual = ::lseek(fd, offset, SEEK_SET);
int did = 0;
derr(0) << "couldn't write to " << fn << " len " << len << " off " << offset << " errno " << errno << " " << strerror(errno) << dendl;
}
- ::flock(fd, LOCK_UN);
// schedule sync
if (did >= 0)
delete onsafe;
::close(fd);
+
+ op_finish();
return did;
}
int n = ::open(nfn, O_CREAT|O_TRUNC|O_WRONLY, 0644);
if (n < 0)
return -errno;
+
+ op_start();
+
int r = 0;
#ifndef DARWIN
if (btrfs)
}
#endif
}
+
+ op_finish();
+
if (r < 0)
return -errno;
sync_cond.WaitInterval(lock, interval);
lock.Unlock();
- dout(20) << "sync_entry committing " << super_epoch << " " << interval << dendl;
+ dout(20) << "sync_entry committing " << op_seq << " " << interval << dendl;
commit_start();
// induce an fs sync.
char fn[100];
sprintf(fn, "%s/commit_epoch", basedir.c_str());
int fd = ::open(fn, O_CREAT|O_WRONLY, 0644);
- ::write(fd, &super_epoch, sizeof(super_epoch));
+ ::write(fd, &op_seq, sizeof(op_seq));
+
+ commit_started();
+
::fsync(fd); // this should cause the fs's journal to commit. (on btrfs too.)
::close(fd);
commit_finish();
lock.Lock();
- dout(20) << "sync_entry committed " << super_epoch << dendl;
+ dout(20) << "sync_entry committed to op_seq " << op_seq << dendl;
}
lock.Unlock();
}
Context *onsafe)
{
int r;
+ op_start();
if (fake_attrs)
r = attrs.setattr(cid, oid, name, value, size, onsafe);
else {
journal_setattr(cid, oid, name, value, size, onsafe);
else
delete onsafe;
+ op_finish();
return r < 0 ? -errno:r;
}
int FileStore::setattrs(coll_t cid, pobject_t oid, map<string,bufferptr>& aset, Context *onsafe)
{
int r;
+ op_start();
if (fake_attrs)
r = attrs.setattrs(cid, oid, aset);
else {
journal_setattrs(cid, oid, aset, onsafe);
else
delete onsafe;
+ op_finish();
return r < 0 ? -errno:r;
}
int FileStore::rmattr(coll_t cid, pobject_t oid, const char *name, Context *onsafe)
{
int r;
+ op_start();
if (fake_attrs)
r = attrs.rmattr(cid, oid, name, onsafe);
else {
journal_rmattr(cid, oid, name, onsafe);
else
delete onsafe;
+ op_finish();
return r < 0 ? -errno:r;
}
Context *onsafe)
{
int r;
+ op_start();
if (fake_attrs)
r = attrs.collection_setattr(c, name, value, size, onsafe);
else {
journal_collection_setattr(c, name, value, size, onsafe);
else
delete onsafe;
+ op_finish();
return r < 0 ? -errno:r;
}
Context *onsafe)
{
int r;
+ op_start();
if (fake_attrs)
r = attrs.collection_rmattr(c, name, onsafe);
else {
get_cdir(c, fn);
r = do_removexattr(fn, name);
}
+ op_finish();
return r < 0 ? -errno:r;
}
int FileStore::collection_setattrs(coll_t cid, map<string,bufferptr>& aset)
{
int r;
+ op_start();
if (fake_attrs)
r = attrs.collection_setattrs(cid, aset);
else {
}
if (r >= 0)
journal_collection_setattrs(cid, aset, 0);
+ op_finish();
return r < 0 ? -errno:r;
}
{
if (fake_collections) return collections.create_collection(c, onsafe);
+ op_start();
+
char fn[200];
get_cdir(c, fn);
journal_create_collection(c, onsafe);
else
delete onsafe;
+ op_finish();
return r < 0 ? -errno:r;
}
{
if (fake_collections) return collections.destroy_collection(c, onsafe);
+ op_start();
+
char fn[200];
get_cdir(c, fn);
char cmd[200];
journal_destroy_collection(c, onsafe);
else
delete onsafe;
+
+ op_finish();
return 0;
}
Context *onsafe)
{
int r;
+ op_start();
if (fake_collections)
r = collections.collection_add(c, o, onsafe);
else {
journal_collection_add(c, cid, o, onsafe);
else
delete onsafe;
+ op_finish();
return r < 0 ? -errno:r;
}
Context *onsafe)
{
int r;
+ op_start();
if (fake_collections)
r = collections.collection_remove(c, o, onsafe);
else {
journal_collection_remove(c, o, onsafe);
else
delete onsafe;
+ op_finish();
return r < 0 ? -errno:r;
}
class JournalingObjectStore : public ObjectStore {
protected:
- epoch_t super_epoch;
+ __u64 op_seq;
+ __u64 committing_op_seq;
Journal *journal;
Finisher finisher;
map<version_t, vector<Context*> > commit_waiters;
}
void commit_start() {
+ // suspend new ops...
op_lock.get_write();
- super_epoch++;
+ }
+ void commit_started() {
+ // allow new ops
+ // (underlying fs should now be committing all prior ops)
+ committing_op_seq = op_seq;
op_lock.put_write();
}
void commit_finish() {
if (journal)
- journal->committed_thru(super_epoch-1);
- finisher.queue(commit_waiters[super_epoch-1]);
+ journal->committed_thru(committing_op_seq);
+ finisher.queue(commit_waiters[committing_op_seq]);
}
void queue_commit_waiter(Context *oncommit) {
if (oncommit)
- commit_waiters[super_epoch].push_back(oncommit);
+ commit_waiters[op_seq].push_back(oncommit);
}
void journal_transaction(Transaction &t, Context *onsafe) {
+ ++op_seq;
if (journal && journal->is_writeable()) {
bufferlist tbl;
t.encode(tbl);
- journal->submit_entry(super_epoch, tbl, onsafe);
+ journal->submit_entry(op_seq, tbl, onsafe);
+ } else
+ queue_commit_waiter(onsafe);
+ }
+ void journal_transaction(bufferlist& tbl, Context *onsafe) {
+ ++op_seq;
+ if (journal && journal->is_writeable()) {
+ journal->submit_entry(op_seq, tbl, onsafe);
} else
queue_commit_waiter(onsafe);
}
void journal_write(coll_t cid, pobject_t oid, loff_t off, size_t len, const bufferlist& bl, Context *onsafe) {
+ ++op_seq;
if (journal && journal->is_writeable()) {
Transaction t;
t.write(cid, oid, off, len, bl);
bufferlist tbl;
t.encode(tbl);
- journal->submit_entry(super_epoch, tbl, onsafe);
+ journal->submit_entry(op_seq, tbl, onsafe);
} else
queue_commit_waiter(onsafe);
}
void journal_zero(coll_t cid, pobject_t oid, loff_t off, size_t len, Context *onsafe) {
+ ++op_seq;
if (journal && journal->is_writeable()) {
Transaction t;
t.zero(cid, oid, off, len);
bufferlist tbl;
t.encode(tbl);
- journal->submit_entry(super_epoch, tbl, onsafe);
+ journal->submit_entry(op_seq, tbl, onsafe);
} else
queue_commit_waiter(onsafe);
}
void journal_remove(coll_t cid, pobject_t oid, Context *onsafe) {
+ ++op_seq;
if (journal && journal->is_writeable()) {
Transaction t;
t.remove(cid, oid);
bufferlist bl;
t.encode(bl);
- journal->submit_entry(super_epoch, bl, onsafe);
+ journal->submit_entry(op_seq, bl, onsafe);
} else
queue_commit_waiter(onsafe);
}
void journal_truncate(coll_t cid, pobject_t oid, loff_t size, Context *onsafe) {
+ ++op_seq;
if (journal && journal->is_writeable()) {
Transaction t;
t.truncate(cid, oid, size);
bufferlist bl;
t.encode(bl);
- journal->submit_entry(super_epoch, bl, onsafe);
+ journal->submit_entry(op_seq, bl, onsafe);
} else
queue_commit_waiter(onsafe);
}
void journal_clone(coll_t cid, pobject_t from, pobject_t to, Context *onsafe) {
+ ++op_seq;
if (journal && journal->is_writeable()) {
Transaction t;
t.clone(cid, from, to);
bufferlist bl;
t.encode(bl);
- journal->submit_entry(super_epoch, bl, onsafe);
+ journal->submit_entry(op_seq, bl, onsafe);
} else
queue_commit_waiter(onsafe);
}
void journal_setattr(coll_t cid, pobject_t oid, const char *name, const void *value, size_t size, Context *onsafe) {
+ ++op_seq;
if (journal && journal->is_writeable()) {
Transaction t;
t.setattr(cid, oid, name, value, size);
bufferlist bl;
t.encode(bl);
- journal->submit_entry(super_epoch, bl, onsafe);
+ journal->submit_entry(op_seq, bl, onsafe);
} else
queue_commit_waiter(onsafe);
}
void journal_setattrs(coll_t cid, pobject_t oid, map<string,bufferptr>& attrset, Context *onsafe) {
+ ++op_seq;
if (journal && journal->is_writeable()) {
Transaction t;
t.setattrs(cid, oid, attrset);
bufferlist bl;
t.encode(bl);
- journal->submit_entry(super_epoch, bl, onsafe);
+ journal->submit_entry(op_seq, bl, onsafe);
} else
queue_commit_waiter(onsafe);
}
void journal_rmattr(coll_t cid, pobject_t oid, const char *name, Context *onsafe) {
+ ++op_seq;
if (journal && journal->is_writeable()) {
Transaction t;
t.rmattr(cid, oid, name);
bufferlist bl;
t.encode(bl);
- journal->submit_entry(super_epoch, bl, onsafe);
+ journal->submit_entry(op_seq, bl, onsafe);
} else
queue_commit_waiter(onsafe);
}
void journal_create_collection(coll_t cid, Context *onsafe) {
+ ++op_seq;
if (journal && journal->is_writeable()) {
Transaction t;
t.create_collection(cid);
bufferlist bl;
t.encode(bl);
- journal->submit_entry(super_epoch, bl, onsafe);
+ journal->submit_entry(op_seq, bl, onsafe);
} else
queue_commit_waiter(onsafe);
}
void journal_destroy_collection(coll_t cid, Context *onsafe) {
+ ++op_seq;
if (journal && journal->is_writeable()) {
Transaction t;
t.remove_collection(cid);
bufferlist bl;
t.encode(bl);
- journal->submit_entry(super_epoch, bl, onsafe);
+ journal->submit_entry(op_seq, bl, onsafe);
} else
queue_commit_waiter(onsafe);
}
void journal_collection_add(coll_t cid, coll_t ocid, pobject_t oid, Context *onsafe) {
+ ++op_seq;
if (journal && journal->is_writeable()) {
Transaction t;
t.collection_add(cid, ocid, oid);
bufferlist bl;
t.encode(bl);
- journal->submit_entry(super_epoch, bl, onsafe);
+ journal->submit_entry(op_seq, bl, onsafe);
} else
queue_commit_waiter(onsafe);
}
void journal_collection_remove(coll_t cid, pobject_t oid, Context *onsafe) {
+ ++op_seq;
if (journal && journal->is_writeable()) {
Transaction t;
t.collection_remove(cid, oid);
bufferlist bl;
t.encode(bl);
- journal->submit_entry(super_epoch, bl, onsafe);
+ journal->submit_entry(op_seq, bl, onsafe);
} else
queue_commit_waiter(onsafe);
}
void journal_collection_setattr(coll_t cid, const char *name, const void *value, size_t size, Context *onsafe) {
+ ++op_seq;
if (journal && journal->is_writeable()) {
Transaction t;
t.collection_setattr(cid, name, value, size);
bufferlist bl;
t.encode(bl);
- journal->submit_entry(super_epoch, bl, onsafe);
+ journal->submit_entry(op_seq, bl, onsafe);
} else
queue_commit_waiter(onsafe);
}
void journal_collection_setattrs(coll_t cid, map<string,bufferptr>& aset, Context *onsafe) {
+ ++op_seq;
if (journal && journal->is_writeable()) {
Transaction t;
t.collection_setattrs(cid, aset);
bufferlist bl;
t.encode(bl);
- journal->submit_entry(super_epoch, bl, onsafe);
+ journal->submit_entry(op_seq, bl, onsafe);
} else
queue_commit_waiter(onsafe);
}
void journal_sync(Context *onsafe) {
+ ++op_seq;
if (journal) {
// journal empty transaction
Transaction t;
bufferlist bl;
t.encode(bl);
- journal->submit_entry(super_epoch, bl, onsafe);
+ journal->submit_entry(op_seq, bl, onsafe);
} else
queue_commit_waiter(onsafe);
}
public:
- JournalingObjectStore() : super_epoch(0), journal(0) { }
+ JournalingObjectStore() : op_seq(0), committing_op_seq(0), journal(0) { }
};