OSBDB_OBJ = osbdb.o
endif
-TARGETS = cmon cosd cmds cfuse csyn newsyn fakesyn mkmonmap
+TARGETS = cmon cosd cmds csyn newsyn fakesyn mkmonmap cfuse fakefuse
SRCS=*.cc */*.cc *.h */*.h */*/*.h
// args for fuse
vec_to_argv(args, argc, argv);
+ // FUSE will chdir("/"); be ready.
+ g_conf.use_abspaths = true;
+
// load monmap
MonMap monmap;
int r = monmap.read(".ceph_monmap");
Client::~Client()
{
- if (messenger) { delete messenger; messenger = 0; }
+ tear_down_cache();
+
+ if (objectcacher) {
+ delete objectcacher;
+ objectcacher = 0;
+ }
+
if (filer) { delete filer; filer = 0; }
- if (objectcacher) { delete objectcacher; objectcacher = 0; }
if (objecter) { delete objecter; objecter = 0; }
if (osdmap) { delete osdmap; osdmap = 0; }
+ if (mdsmap) { delete mdsmap; mdsmap = 0; }
if (capcache) { delete capcache; capcache = 0; }
- tear_down_cache();
+ if (messenger) { delete messenger; messenger = 0; }
}
if (cap_reap_queue[in->ino()].empty())
cap_reap_queue.erase(in->ino());
}
+ delete m;
return;
}
} else {
//dout(0) << "didn't put_inode" << endl;
}
-
+ delete m;
return;
}
}
}
in->fc.set_caps(new_caps, onimplement);
-
} else {
// caching off.
dout(7) << "open got caps " << cap_string(new_caps)
<< " for " << f->inode->ino()
<< " seq " << reply->get_file_caps_seq()
- << " from mds" << mds << endl;
+ << " from mds" << mds
+ << endl;
int old_caps = f->inode->caps[mds].caps;
dout(7) << "open got SAME caps " << cap_string(new_caps)
<< " for " << f->inode->ino()
<< " seq " << reply->get_file_caps_seq()
- << " from mds" << mds << endl;
+ << " from mds" << mds
+ << endl;
}
// put in map
// ------------
// read, write
+
+off_t Client::lseek(fh_t fh, off_t offset, int whence)
+{
+ client_lock.Lock();
+ dout(3) << "op: client->lseek(" << fh << ", " << offset << ", " << whence << ");" << endl;
+
+ assert(fh_map.count(fh));
+ Fh *f = fh_map[fh];
+ Inode *in = f->inode;
+
+ switch (whence) {
+ case SEEK_SET:
+ f->pos = offset;
+ break;
+
+ case SEEK_CUR:
+ f->pos += offset;
+ break;
+
+ case SEEK_END:
+ f->pos = in->inode.size + offset;
+ break;
+
+ default:
+ assert(0);
+ }
+
+ off_t pos = f->pos;
+ client_lock.Unlock();
+
+ return pos;
+}
+
+
// blocking osd interface
int Client::read(fh_t fh, char *buf, off_t size, off_t offset,
if (!lazy && (in->file_caps() & (CAP_FILE_WRBUFFER|CAP_FILE_RDCACHE))) {
// we're doing buffered i/o. make sure we're inside the file.
// we can trust size info bc we get accurate info when buffering/caching caps are issued.
- dout(10) << "file size: " << in->inode.size << endl;
+ dout(-10) << "file size: " << in->inode.size << endl;
if (offset > 0 && offset >= in->inode.size) {
client_lock.Unlock();
return 0;
}
- if (offset + size > (unsigned)in->inode.size) size = (unsigned)in->inode.size - offset;
+ if (offset + size > (off_t)in->inode.size)
+ size = (off_t)in->inode.size - offset;
if (size == 0) {
- dout(10) << "read is size=0, returning 0" << endl;
+ dout(-10) << "read is size=0, returning 0" << endl;
client_lock.Unlock();
return 0;
}
}
bufferlist blist; // data will go here
- int rvalue = 0;
int r = 0;
+ int rvalue = 0;
if (g_conf.client_oc) {
// object cache ON
}
+// =========================================
+// layout
+
+
+int Client::describe_layout(int fh, FileLayout *lp)
+{
+ client_lock.Lock();
+ dout(3) << "op: client->describe_layout(" << fh << ");" << endl;
+
+ assert(fh_map.count(fh));
+ Fh *f = fh_map[fh];
+ Inode *in = f->inode;
+
+ *lp = in->inode.layout;
+
+ client_lock.Unlock();
+ return 0;
+}
+
+int Client::get_stripe_unit(int fd)
+{
+ FileLayout layout;
+ describe_layout(fd, &layout);
+ return layout.stripe_size;
+}
+
+int Client::get_stripe_width(int fd)
+{
+ FileLayout layout;
+ describe_layout(fd, &layout);
+ return layout.stripe_size*layout.stripe_count;
+}
+
+int Client::get_stripe_period(int fd)
+{
+ FileLayout layout;
+ describe_layout(fd, &layout);
+ return layout.period();
+}
+
+int Client::enumerate_layout(int fh, list<ObjectExtent>& result,
+ off_t length, off_t offset)
+{
+ client_lock.Lock();
+ dout(3) << "op: client->enumerate_layout(" << fh << ", " << length << ", " << offset << ");" << endl;
+
+ assert(fh_map.count(fh));
+ Fh *f = fh_map[fh];
+ Inode *in = f->inode;
+
+ // map to a list of extents
+ filer->file_to_extents(in->inode, offset, length, result);
+
+ client_lock.Unlock();
+ return 0;
+}
+
+
+
void Client::ms_handle_failure(Message *m, const entity_inst_t& inst)
{
entity_name_t dest = inst.name;
__int64_t uid = -1, __int64_t gid = -1);
int close(fh_t fh,
__int64_t uid = -1, __int64_t gid = -1);
+ off_t lseek(fh_t fh, off_t offset, int whence);
int read(fh_t fh, char *buf, off_t size, off_t offset=-1,
__int64_t uid = -1, __int64_t gid = -1);
int write(fh_t fh, const char *buf, off_t size, off_t offset=-1,
int fsync(fh_t fh, bool syncdataonly,
__int64_t uid = -1, __int64_t gid = -1);
+
// hpc lazyio
int lazyio_propogate(int fd, off_t offset, size_t count,
__int64_t uid = -1, __int64_t gid = -1);
int lazyio_synchronize(int fd, off_t offset, size_t count,
__int64_t uid = -1, __int64_t gid = -1);
- int describe_layout(char *fn, list<ObjectExtent>& result);
+ // expose file layout
+ int describe_layout(int fd, FileLayout* layout);
+ int get_stripe_unit(int fd);
+ int get_stripe_width(int fd);
+ int get_stripe_period(int fd);
+ int enumerate_layout(int fd, list<ObjectExtent>& result,
+ off_t length, off_t offset);
+ // failure
void ms_handle_failure(Message*, const entity_inst_t& inst);
};
}
+void FileCache::tear_down()
+{
+ off_t unclean = release_clean();
+ if (unclean) {
+ dout(0) << "tear_down " << unclean << " unclean bytes, purging" << endl;
+ oc->purge_set(inode.ino);
+ }
+}
+
// caps
void FileCache::set_caps(int caps, Context *onimplement)
latest_caps(0),
num_reading(0), num_writing(0),// num_unsafe(0),
waitfor_release(false) {}
+ ~FileCache() {
+ tear_down();
+ }
// waiters/waiting
bool can_read() { return latest_caps & CAP_FILE_RD; }
bool is_cached();
bool is_dirty();
+ void tear_down();
+
int get_caps() { return latest_caps; }
void set_caps(int caps, Context *onimplement=0);
void check_caps();
}
dout(2) << "writing block " << i << "/" << chunks << endl;
- // fill buf with a fingerprint
- int *p = (int*)buf;
+ // fill buf with a 16 byte fingerprint
+ // 64 bits : file offset
+ // 64 bits : client id
+ // = 128 bits (16 bytes)
+ __uint64_t *p = (__uint64_t*)buf;
while ((char*)p < buf + wrsize) {
- *p = (char*)p - buf;
- p++;
- *p = i;
+ *p = i*wrsize + (char*)p - buf;
p++;
*p = client->get_nodeid();
p++;
- *p = 0;
- p++;
}
client->write(fd, buf, wrsize, i*wrsize);
for (unsigned i=0; i<chunks; i++) {
if (time_to_stop()) break;
dout(2) << "reading block " << i << "/" << chunks << endl;
- client->read(fd, buf, rdsize, i*rdsize);
+ int r = client->read(fd, buf, rdsize, i*rdsize);
+ if (r < rdsize) {
+ dout(1) << "read_file got r = " << r << ", probably end of file" << endl;
+ break;
+ }
// verify fingerprint
- int *p = (int*)buf;
int bad = 0;
- int boff, bgoff, bchunk, bclient, bzero;
+ __int64_t *p = (__int64_t*)buf;
+ __int64_t readoff, readclient;
while ((char*)p + 32 < buf + rdsize) {
- boff = *p;
- bgoff = (int)((char*)p - buf);
+ readoff = *p;
+ __int64_t wantoff = i*rdsize + (__int64_t)((char*)p - buf);
p++;
- bchunk = *p;
+ readclient = *p;
p++;
- bclient = *p;
- p++;
- bzero = *p;
- p++;
- if (boff != bgoff ||
- bchunk != (int)i ||
- bclient != client->get_nodeid() ||
- bzero != 0) {
+ if (readoff != wantoff ||
+ readclient != client->get_nodeid()) {
if (!bad)
- dout(0) << "WARNING: wrong data from OSD, it should be "
- << "(block=" << i
- << " offset=" << bgoff
- << " client=" << client->get_nodeid() << ")"
- << " .. but i read back .. "
- << "(block=" << bchunk
- << " offset=" << boff
- << " client=" << bclient << " zero=" << bzero << ")" << endl;
-
+ dout(0) << "WARNING: wrong data from OSD, block says fileoffset=" << readoff << " client=" << readclient
+ << ", should be offset " << wantoff << " clietn " << client->get_nodeid()
+ << endl;
bad++;
}
}
if (bad)
dout(0) << " + " << (bad-1) << " other bad 16-byte bits in this block" << endl;
-
}
client->close(fd);
// go fuse go
cout << "ok, calling fuse_main" << endl;
- return fuse_main(newargc, newargv, &ceph_oper);
+ int r = fuse_main(newargc, newargv, &ceph_oper);
+ return r;
}
{
logger_lock.Lock();
{
+ filename = "";
+ if (g_conf.use_abspaths) {
+ char *cwd = get_current_dir_name();
+ filename = cwd;
+ delete cwd;
+ filename += "/";
+ }
+
filename = "log/";
if (g_conf.log_name) {
filename += g_conf.log_name;
}
int join(void **prval = 0) {
- if (thread_id == 0) return -1; // never started.
+ assert(thread_id);
+ //if (thread_id == 0) return -1; // never started.
+
int status = pthread_join(thread_id, prval);
if (status == 0)
thread_id = 0;
scheduled.erase(tp);
lock.Unlock();
+
+ // delete the canceled event.
+ delete callback;
+
return true;
}
if (g_timer.cancel_event(scheduled[c])) {
// hosed wrapper. hose original event too.
- delete scheduled[c];
+ delete c;
} else {
// clean up later.
canceled[c] = scheduled[c];
debug_after: 0,
+ // -- misc --
+ use_abspaths: false, // make monitorstore et al use absolute path (to workaround FUSE chdir("/"))
+
// --- clock ---
clock_lock: false,
bdbstore_ffactor: 0,
bdbstore_nelem: 0,
bdbstore_pagesize: 0,
- bdbstore_cachesize: 0
+ bdbstore_cachesize: 0,
+ bdbstore_transactional: false
#endif // USE_OSBDB
};
else if (strcmp(args[i], "--bdbstore-cachesize") == 0) {
g_conf.bdbstore_cachesize = atoi(args[++i]);
}
+ else if (strcmp(args[i], "--bdbstore-transactional") == 0) {
+ g_conf.bdbstore_transactional = true;
+ }
+ else if (strcmp(args[i], "--debug-bdbstore") == 0) {
+ g_conf.debug_bdbstore = atoi(args[++i]);
+ }
#endif // USE_OSBDB
else {
int debug_after;
+ // misc
+ bool use_abspaths;
+
// clock
bool clock_lock;
int bdbstore_nelem;
int bdbstore_pagesize;
int bdbstore_cachesize;
+ bool bdbstore_transactional;
#endif // USE_OSBDB
};
#include "osd/OSD.h"
#include "ebofs/Ebofs.h"
-#include "msg/NewMessenger.h"
+#include "msg/SimpleMessenger.h"
#include "common/Timer.h"
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
#ifndef __crush_BINARYTREE_H
#define __crush_BINARYTREE_H
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
#ifndef __crush_BUCKET_H
#define __crush_BUCKET_H
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
// Robert Jenkins' function for mixing 32-bit values
// http://burtleburtle.net/bob/hash/evahash.html
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
#ifndef __crush_CRUSH_H
#define __crush_CRUSH_H
int bucketno;
Hash h;
- hash_map<int, int> parent_map; // what bucket each leaf/bucket lives in
+ hash_map<int, int> parent_map; // what bucket each leaf/bucket lives in
public:
map<int, Rule> rules;
off += sizeof(r);
rules[r]._decode(bl,off);
}
-
- // index
- build_parent_map();
+
+ // index
+ build_parent_map();
}
- void build_parent_map() {
- parent_map.clear();
-
- // index every bucket
- for (map<int, Bucket*>::iterator bp = buckets.begin();
- bp != buckets.end();
- ++bp) {
- // index bucket items
- vector<int> items;
- bp->second->get_items(items);
- for (vector<int>::iterator ip = items.begin();
- ip != items.end();
- ++ip)
- parent_map[*ip] = bp->first;
- }
- }
-
+ void build_parent_map() {
+ parent_map.clear();
+
+ // index every bucket
+ for (map<int, Bucket*>::iterator bp = buckets.begin();
+ bp != buckets.end();
+ ++bp) {
+ // index bucket items
+ vector<int> items;
+ bp->second->get_items(items);
+ for (vector<int>::iterator ip = items.begin();
+ ip != items.end();
+ ++ip)
+ parent_map[*ip] = bp->first;
+ }
+ }
+
public:
vector<int>& outvec,
bool firstn,
set<int>& outset, map<int,float>& overloadmap,
- bool forcefeed=false,
- int forcefeedval=-1) {
+ bool forcefeed=false,
+ int forcefeedval=-1) {
int off = outvec.size();
// for each replica
for (int rep=0; rep<numrep; rep++) {
int outv = -1; // my result
- // forcefeed?
- if (forcefeed) {
- forcefeed = false;
- outvec.push_back(forcefeedval);
- continue;
- }
-
+ // forcefeed?
+ if (forcefeed) {
+ forcefeed = false;
+ outvec.push_back(forcefeedval);
+ continue;
+ }
+
// keep trying until we get a non-out, non-colliding item
int ftotal = 0;
bool skip_rep = false;
-
+
while (1) {
// start with the input bucket
Bucket *in = inbucket;
//int numresult = 0;
result.clear();
- // determine hierarchical context for first.
- list<int> force_stack;
- if (forcefeed >= 0) {
- int t = forcefeed;
- while (1) {
- force_stack.push_front(t);
- if (parent_map.count(t) == 0) break; // reached root, presumably.
- //cout << " " << t << " parent is " << parent_map[t] << endl;
- t = parent_map[t];
- }
- }
-
+ // determine hierarchical context for first.
+ list<int> force_stack;
+ if (forcefeed >= 0) {
+ int t = forcefeed;
+ while (1) {
+ force_stack.push_front(t);
+ if (parent_map.count(t) == 0) break; // reached root, presumably.
+ //cout << " " << t << " parent is " << parent_map[t] << endl;
+ t = parent_map[t];
+ }
+ }
+
// working vector
vector<int> w; // working variable
-
+
// go through each statement
for (vector<RuleStep>::iterator pc = rule.steps.begin();
pc != rule.steps.end();
{
const int arg = pc->args[0];
//cout << "take " << arg << endl;
-
- if (!force_stack.empty()) {
- int forceval = force_stack.front();
- force_stack.pop_front();
- assert(arg == forceval);
- }
-
+
+ if (!force_stack.empty()) {
+ int forceval = force_stack.front();
+ force_stack.pop_front();
+ assert(arg == forceval);
+ }
+
w.clear();
w.push_back(arg);
}
vector<int> out;
// forcefeeding?
- bool forcing = false;
- int forceval;
- if (!force_stack.empty()) {
- forceval = force_stack.front();
- force_stack.pop_front();
- //cout << "priming out with " << forceval << endl;
- forcing = true;
- }
-
+ bool forcing = false;
+ int forceval;
+ if (!force_stack.empty()) {
+ forceval = force_stack.front();
+ force_stack.pop_front();
+ //cout << "priming out with " << forceval << endl;
+ forcing = true;
+ }
+
// do each row independently
for (vector<int>::iterator i = w.begin();
i != w.end();
i++) {
assert(buckets.count(*i));
Bucket *b = buckets[*i];
- choose(x, numrep, type, b, out, firstn,
- outset, overloadmap,
- forcing,
- forceval);
- forcing = false; // only once
+ choose(x, numrep, type, b, out, firstn,
+ outset, overloadmap,
+ forcing,
+ forceval);
+ forcing = false; // only once
} // for inrow
// put back into w
--- /dev/null
+OBJECT STORE ON BERKELEY DB
+---------------------------
+
+OSBDB is an implementation of an object store that uses Berkeley DB as
+the underlying storage. It is meant to be an alternative to EBOFS.
+
+BUILDING
+--------
+
+You will need to have Berkeley DB installed, including the developent
+packages. We've tested this with Berkeley DB 4.4.20 on Ubuntu 6.10.
+
+To compile OSBDB support, you need to pass the argument "want_bdb=yes"
+to "make." If you don't specify this, OSBDB and all its associated
+support is not included in the executables.
+
+RUNNING
+-------
+
+To use OSBDB in Ceph, simply pass the --bdbstore flag to programs. You
+don't need to create a "device" for OSBDB ahead of time; Berkeley DB
+will take care of creating the files. You also *cannot* use a raw
+device as your store -- it must be regular file.
+
+OSBDB additionally accepts the following flags:
+
+ --bdbstore-btree Configures OSBDB to use the "Btree"
+ database type for Berkeley DB. The default
+ database type is "Hash".
+
+ --bdbstore-hash-ffactor Sets the "fill factor" for the hash
+ database type. Takes an integer argument.
+
+ --bdbstore-hash-nelem Sets the "nelem" parameter for the hash
+ database type. Takes an integer argument.
+
+ --bdbstore-hash-pagesize Sets the page size for the hash database
+ type. Takes an integer argument.
+
+ --bdbstore-cachesize Sets the cache size. Takes an integer
+ argument, which must be a power of two, and
+ no less than 20 KiB.
+
+ --bdbstore-transactional Enable (in-memory-only) transactions for
+ all operations in the OSBDB store.
+
+ --debug-bdbstore Set the debug level. Takes an integer
+ argument.
BarrierQueue(BlockDevice *bd, const char *d) : bdev(bd), dev(d) {
barrier();
}
+ ~BarrierQueue() {
+ for (list<Queue*>::iterator p = qls.begin();
+ p != qls.end();
+ ++p)
+ delete *p;
+ qls.clear();
+ }
int size() {
// this isn't perfectly accurate.
if (!qls.empty())
}
+int ObjectCache::try_map_read(block_t start, block_t len)
+{
+ map<block_t, BufferHead*>::iterator p = data.lower_bound(start);
+
+ block_t cur = start;
+ block_t left = len;
+
+ if (p != data.begin() &&
+ (p == data.end() || p->first > cur)) {
+ p--; // might overlap!
+ if (p->first + p->second->length() <= cur)
+ p++; // doesn't overlap.
+ }
+
+ int num_missing = 0;
+
+ while (left > 0) {
+ // at end?
+ if (p == data.end()) {
+ // rest is a miss.
+ vector<Extent> exv;
+ on->map_extents(cur,
+ left, // no prefetch here!
+ exv);
+
+ num_missing += exv.size();
+ left = 0;
+ cur = start+len;
+ break;
+ }
+
+ if (p->first <= cur) {
+ // have it (or part of it)
+ BufferHead *e = p->second;
+
+ if (e->is_clean() ||
+ e->is_dirty() ||
+ e->is_tx()) {
+ dout(20) << "try_map_read hit " << *e << endl;
+ }
+ else if (e->is_rx()) {
+ dout(20) << "try_map_read rx " << *e << endl;
+ num_missing++;
+ }
+ else if (e->is_partial()) {
+ dout(-20) << "try_map_read partial " << *e << endl;
+ num_missing++;
+ }
+ else {
+ dout(0) << "try_map_read got unexpected " << *e << endl;
+ assert(0);
+ }
+
+ block_t lenfromcur = MIN(e->end() - cur, left);
+ cur += lenfromcur;
+ left -= lenfromcur;
+ p++;
+ continue; // more?
+ } else if (p->first > cur) {
+ // gap.. miss
+ block_t next = p->first;
+ vector<Extent> exv;
+ on->map_extents(cur,
+ MIN(next-cur, left), // no prefetch
+ exv);
+
+ dout(-20) << "try_map_read gap of " << p->first-cur << " blocks, "
+ << exv.size() << " extents" << endl;
+ num_missing += exv.size();
+ left -= (p->first - cur);
+ cur = p->first;
+ continue; // more?
+ }
+ else
+ assert(0);
+ }
+
+ assert(left == 0);
+ assert(cur == start+len);
+ return num_missing;
+}
+
+
+
+
/*
* map a range of blocks into buffer_heads.
dout(20) << "map_read partial " << *e << endl;
}
else {
- dout(0) << "map_read ??? " << *e << endl;
+ dout(0) << "map_read ??? got unexpected " << *e << endl;
assert(0);
}
{
dout(10) << "bh_read " << *on << " on " << *bh << endl;
- if (bh->is_missing()) {
+ if (bh->is_missing()) {
mark_rx(bh);
} else {
assert(bh->is_partial());
// this should be empty!!
assert(bh->rx_ioh == 0);
- dout(20) << "bh_read " << *bh << " from " << ex << endl;
+ dout(20) << "bh_read " << *on << " " << *bh << " from " << ex << endl;
C_OC_RxFinish *fin = new C_OC_RxFinish(ebofs_lock, on->oc,
bh->start(), bh->length(),
if (shouldbe)
assert(ex.length == 1 && ex.start == shouldbe);
- dout(20) << "bh_write " << *bh << " to " << ex << endl;
+ dout(20) << "bh_write " << *on << " " << *bh << " to " << ex << endl;
//assert(bh->tx_ioh == 0);
map<block_t, BufferHead*>& missing, // read these from disk
map<block_t, BufferHead*>& rx, // wait for these to finish reading from disk
map<block_t, BufferHead*>& partial); // (maybe) wait for these to read from disk
+ int try_map_read(block_t start, block_t len); // just tell us how many extents we're missing.
+
int map_write(block_t start, block_t len,
interval_set<block_t>& alloc,
#define dout(x) if (x <= g_conf.debug_ebofs) cout << "ebofs(" << dev.get_device_name() << ")."
#define derr(x) if (x <= g_conf.debug_ebofs) cerr << "ebofs(" << dev.get_device_name() << ")."
+
char *nice_blocks(block_t b)
{
static char s[20];
if (!on->have_oc()) {
// nothing is cached. return # of extents in file.
+ dout(10) << "_is_cached have onode but no object cache, returning extent count" << endl;
return on->extent_map.size();
}
map<block_t, BufferHead*> missing; // read these
map<block_t, BufferHead*> rx; // wait for these
map<block_t, BufferHead*> partials; // ??
- on->get_oc(&bc)->map_read(bstart, blen, hits, missing, rx, partials);
- return missing.size() + rx.size() + partials.size();
+
+ int num_missing = on->get_oc(&bc)->try_map_read(bstart, blen);
+ dout(7) << "_is_cached try_map_read reports " << num_missing << " missing extents" << endl;
+ return num_missing;
// FIXME: actually, we should calculate if these extents are contiguous.
// and not using map_read, probably...
assert(cursor.open[cursor.level].size() == 0);
assert(depth == 1);
root = -1;
- depth = 0;
- pool.release(cursor.open[0].node);
+ depth = 0;
+ if (cursor.open[0].node)
+ pool.release(cursor.open[0].node);
}
verify("remove 1");
return 0;
args = nargs;
vec_to_argv(args, argc, argv);
+ // FUSE will chdir("/"); be ready.
+ g_conf.use_abspaths = true;
+
MonMap *monmap = new MonMap(g_conf.num_mon);
Monitor *mon[g_conf.num_mon];
#include "crypto/CryptoLib.h"
using namespace CryptoLib;
-#define NUMMDS g_conf.num_mds
-#define NUMOSD g_conf.num_osd
-#define NUMCLIENT g_conf.num_client
class C_Test : public Context {
public:
}
// create mds
- MDS *mds[NUMMDS];
- OSD *mdsosd[NUMMDS];
- for (int i=0; i<NUMMDS; i++) {
+ MDS *mds[g_conf.num_mds];
+ OSD *mdsosd[g_conf.num_mds];
+ for (int i=0; i<g_conf.num_mds; i++) {
//cerr << "mds" << i << " on rank " << myrank << " " << hostname << "." << pid << endl;
mds[i] = new MDS(-1, new FakeMessenger(MSG_ADDR_MDS_NEW), monmap);
if (g_conf.mds_local_osd)
}
// create osd
- OSD *osd[NUMOSD];
- for (int i=0; i<NUMOSD; i++) {
+ OSD *osd[g_conf.num_osd];
+ for (int i=0; i<g_conf.num_osd; i++) {
//cerr << "osd" << i << " on rank " << myrank << " " << hostname << "." << pid << endl;
osd[i] = new OSD(i, new FakeMessenger(MSG_ADDR_OSD(i)), monmap);
start++;
}
// create client
- Client *client[NUMCLIENT];
- SyntheticClient *syn[NUMCLIENT];
- for (int i=0; i<NUMCLIENT; i++) {
+ Client *client[g_conf.num_client];
+ SyntheticClient *syn[g_conf.num_client];
+ for (int i=0; i<g_conf.num_client; i++) {
//cerr << "client" << i << " on rank " << myrank << " " << hostname << "." << pid << endl;
client[i] = new Client(new FakeMessenger(MSG_ADDR_CLIENT(i)), monmap);
start++;
for (int i=0; i<g_conf.num_mon; i++) {
mon[i]->init();
}
- for (int i=0; i<NUMMDS; i++) {
+ for (int i=0; i<g_conf.num_mds; i++) {
mds[i]->init();
if (g_conf.mds_local_osd)
mdsosd[i]->init();
}
- for (int i=0; i<NUMOSD; i++) {
+ for (int i=0; i<g_conf.num_osd; i++) {
osd[i]->init();
}
// create client(s)
- for (int i=0; i<NUMCLIENT; i++) {
+ for (int i=0; i<g_conf.num_client; i++) {
client[i]->init();
// use my argc, argv (make sure you pass a mount point!)
}
- for (int i=0; i<NUMCLIENT; i++) {
+ for (int i=0; i<g_conf.num_client; i++) {
cout << "waiting for synthetic client " << i << " to finish" << endl;
syn[i]->join_thread();
fakemessenger_wait();
// cleanup
- for (int i=0; i<NUMMDS; i++) {
+ for (int i=0; i<g_conf.num_mon; i++) {
+ delete mon[i];
+ }
+ for (int i=0; i<g_conf.num_mds; i++) {
delete mds[i];
}
- for (int i=0; i<NUMOSD; i++) {
+ for (int i=0; i<g_conf.num_osd; i++) {
delete osd[i];
}
- for (int i=0; i<NUMCLIENT; i++) {
+ for (int i=0; i<g_conf.num_client; i++) {
delete client[i];
}
*/
class C_Gather : public Context {
public:
+ bool sub_finish(int r) {
+ //cout << "C_Gather sub_finish " << this << endl;
+ assert(waitfor.count(r));
+ waitfor.erase(r);
+ if (!waitfor.empty())
+ return false; // more subs left
+
+ // last one
+ onfinish->finish(0);
+ delete onfinish;
+ onfinish = 0;
+ return true;
+ }
+
class C_GatherSub : public Context {
C_Gather *gather;
int num;
public:
C_GatherSub(C_Gather *g, int n) : gather(g), num(n) {}
void finish(int r) {
- gather->finish(num);
+ if (gather->sub_finish(num))
+ delete gather; // last one!
}
};
+ Context *new_sub() {
+ num++;
+ waitfor.insert(num);
+ return new C_GatherSub(this, num);
+ }
+
private:
Context *onfinish;
std::set<int> waitfor;
int num;
public:
- C_Gather(Context *f) : onfinish(f), num(0) {}
-
+ C_Gather(Context *f) : onfinish(f), num(0) {
+ //cout << "C_Gather new " << this << endl;
+ }
+ ~C_Gather() {
+ //cout << "C_Gather delete " << this << endl;
+ assert(!onfinish);
+ }
void finish(int r) {
- assert(waitfor.count(r));
- waitfor.erase(r);
- if (waitfor.empty()) {
- onfinish->finish(0);
- delete onfinish;
- }
+ // nobody should ever call me.
+ assert(0);
}
- Context *new_sub() {
- num++;
- waitfor.insert(num);
- return new C_GatherSub(this, num);
- }
};
#endif
out << '.' << o.rev;
return out;
}
+
+
namespace __gnu_cxx {
+#ifndef __LP64__
template<> struct hash<__uint64_t> {
size_t operator()(__uint64_t __x) const {
static hash<__uint32_t> H;
return H((__x >> 32) ^ (__x & 0xffffffff));
}
};
+#endif
template<> struct hash<object_t> {
size_t operator()(const object_t &r) const {
};
}
+
#endif
}
};
+#ifndef __LP64__
template<> struct hash<__int64_t> {
size_t operator()(__int64_t __x) const {
static hash<__int32_t> H;
return H((__x >> 32) ^ (__x & 0xffffffff));
}
};
+#endif
}
--- /dev/null
+#!/usr/bin/perl
+# hi there
+{
+ # startup
+ 'n' => 30, # number of mpi nodes
+ 'sleep' => 3, # seconds to sleep between runs (so you have time to control-c out)
+ 'nummds' => 1,
+ 'numosd' => 6,
+ 'numclient' => 100,
+
+ 'until' => 100, # --syn until $n ... synthetic client will stop itself after this many seconds.
+ 'kill_after' => 300, # seconds before everything commits suicide (in case something hangs)
+
+ # stuff i want to vary
+ # here's a simple example:
+
+ # do --syn writefile command
+ 'writefile' => 1,
+ # and very the write size
+ 'writefile_size' => [ # vary
+# 2048*1024,
+ 1024*1024,
+ 512*1024,
+ 256*1024,
+ 128*1024,
+ 64*1024,
+ 48*1024,
+ 32*1024,
+ 28*1024,
+ 24*1024,
+ 16*1024,
+ 12*1024,
+ 8*1024,
+ 4096,
+# 256,
+# 16,
+# 1
+ ],
+ 'writefile_mb' => 1000, # each client shoudl write 1GB (or more likely, keep going until time runs out)
+
+ 'file_layout_num_rep'=> [1,2], # also vary the replication level
+
+ # pass some other random things to newsyn
+ 'custom' => '--',
+
+ # for final summation (script/sum.pl)
+ # specify time period to look at the results
+ 'start' => 30, # skip first 30 seconds, so that caches are full etc.
+ 'end' => 90, # go for 60 seconds
+
+ # what should i parse/plot?
+ 'comb' => {
+ 'x' => 'writefile_size',
+ 'vars' => [ 'osd.c_wrb', 'osd.r_wrb' ],
+ }
+};
if (anchormgr) { delete anchormgr; anchormgr = NULL; }
if (anchorclient) { delete anchorclient; anchorclient = NULL; }
if (osdmap) { delete osdmap; osdmap = 0; }
+ if (mdsmap) { delete mdsmap; mdsmap = 0; }
+
+ if (server) { delete server; server = 0; }
+ if (locker) { delete locker; locker = 0; }
if (filer) { delete filer; filer = 0; }
if (objecter) { delete objecter; objecter = 0; }
eversion_t pg_trim_to; // primary->replica: trim to here
int op;
- size_t length, offset;
+ size_t length;
+ off_t offset;
+
eversion_t version;
eversion_t old_version;
void set_op(int o) { st.op = o; }
const size_t get_length() { return st.length; }
- const size_t get_offset() { return st.offset; }
+ const off_t get_offset() { return st.offset; }
map<string,bufferptr>& get_attrset() { return attrset; }
void set_attrset(map<string,bufferptr> &as) { attrset = as; }
//void set_rg_nrep(int n) { st.rg_nrep = n; }
void set_length(size_t l) { st.length = l; }
- void set_offset(size_t o) { st.offset = o; }
+ void set_offset(off_t o) { st.offset = o; }
void set_version(eversion_t v) { st.version = v; }
void set_old_version(eversion_t ov) { st.old_version = ov; }
if (g_conf.mkfs) {
store->mkfs();
-
+ store->mount();
+
// i should have already been provided a key via set_new_private_key().
// save it.
// FIXME.
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
-
+#include <errno.h>
void MonitorStore::mount()
{
assert(0);
}
::closedir(d);
+
+ if (g_conf.use_abspaths) {
+ // combine it with the cwd, in case fuse screws things up (i.e. fakefuse)
+ string old = dir;
+ char *cwd = get_current_dir_name();
+ dir = cwd;
+ delete cwd;
+ dir += "/";
+ dir += old;
+ }
}
}
char vs[30];
+#ifdef __LP64__
+ sprintf(vs, "%ld\n", val);
+#else
sprintf(vs, "%lld\n", val);
+#endif
char tfn[200];
sprintf(tfn, "%s.new", fn);
int put_bl_ss(bufferlist& bl, const char *a, const char *b);
bool exists_bl_sn(const char *a, version_t b) {
char bs[20];
+#ifdef __LP64__
+ sprintf(bs, "%lu", b);
+#else
sprintf(bs, "%llu", b);
+#endif
return exists_bl_ss(a, bs);
}
int get_bl_sn(bufferlist& bl, const char *a, version_t b) {
char bs[20];
+#ifdef __LP64__
+ sprintf(bs, "%lu", b);
+#else
sprintf(bs, "%llu", b);
+#endif
return get_bl_ss(bl, a, bs);
}
int put_bl_sn(bufferlist& bl, const char *a, version_t b) {
char bs[20];
+#ifdef __LP64__
+ sprintf(bs, "%lu", b);
+#else
sprintf(bs, "%llu", b);
+#endif
return put_bl_ss(bl, a, bs);
}
<< (osdmap.osds.size() - osdmap.osd_inst.size())
<< " osds to boot" << endl;
}
+
+ delete m;
return;
}
// if epoch != 0 then its incremental
dout(18) << "messenger " << mgr << " at " << mgr->get_myname() << " has " << mgr->num_incoming() << " queued" << endl;
-
if (!mgr->is_ready()) {
dout(18) << "messenger " << mgr << " at " << mgr->get_myname() << " has no dispatcher, skipping" << endl;
it++;
FakeMessenger::~FakeMessenger()
{
-
+ // hose any undelivered messages
+ for (list<Message*>::iterator p = incoming.begin();
+ p != incoming.end();
+ ++p)
+ delete *p;
}
assert(directory.count(_myinst.addr) == 1);
shutdown_set.insert(_myinst.addr);
- /*
- directory.erase(myaddr);
- if (directory.empty()) {
- dout(1) << "fakemessenger: last shutdown" << endl;
- ::fm_shutdown = true;
- cond.Signal(); // why not
- }
- */
-
/*
if (loggers[myaddr]) {
delete loggers[myaddr];
lock.Lock();
- // deliver
- try {
#ifdef LOG_MESSAGES
- // stats
- loggers[get_myaddr()]->inc("+send",1);
- loggers[dest]->inc("-recv",1);
-
- char s[20];
- sprintf(s,"+%s", m->get_type_name());
- loggers[get_myaddr()]->inc(s);
- sprintf(s,"-%s", m->get_type_name());
- loggers[dest]->inc(s);
+ // stats
+ loggers[get_myaddr()]->inc("+send",1);
+ loggers[dest]->inc("-recv",1);
+
+ char s[20];
+ sprintf(s,"+%s", m->get_type_name());
+ loggers[get_myaddr()]->inc(s);
+ sprintf(s,"-%s", m->get_type_name());
+ loggers[dest]->inc(s);
#endif
- // queue
- FakeMessenger *dm = directory[inst.addr];
- if (!dm) {
- dout(1) << "** destination " << inst << " dne" << endl;
- for (map<entity_addr_t, FakeMessenger*>::iterator p = directory.begin();
- p != directory.end();
- ++p) {
- dout(1) << "** have " << p->first << " to " << p->second << endl;
- }
- //assert(dm);
- }
- dm->queue_incoming(m);
-
+ // queue
+ if (directory.count(inst.addr)) {
dout(1) << "--> " << get_myname() << " -> " << inst.name << " " << *m << endl;
-
- }
- catch (...) {
- cout << "no destination " << dest << endl;
- assert(0);
+ directory[inst.addr]->queue_incoming(m);
+ } else {
+ dout(0) << "--> " << get_myname() << " -> " << inst.name << " " << *m
+ << " *** destination DNE ***" << endl;
+ for (map<entity_addr_t, FakeMessenger*>::iterator p = directory.begin();
+ p != directory.end();
+ ++p) {
+ dout(0) << "** have " << p->first << " to " << p->second << endl;
+ }
+ //assert(dm);
+ delete m;
}
-
// wake up loop?
if (!awake) {
dout(10) << "waking up fakemessenger thread" << endl;
using namespace std;
#undef dout
-#define dout(x) if (x <= g_conf.debug_bdbstore) cout << "bdbstore(" << device << ")."
+#define dout(x) if (x <= g_conf.debug || x <= g_conf.debug_bdbstore) cout << "bdbstore(" << device << ")@" << __LINE__ << "."
#undef derr
-#define derr(x) if (x <= g_conf.debug_bdbstore) cerr << "bdbstore(" << device << ")."
+#define derr(x) if (x <= g_conf.debug || x <= g_conf.debug_bdbstore) cerr << "bdbstore(" << device << ")@" << __LINE__ << "."
\f // Utilities.
\f // Management.
-int OSBDB::opendb(DBTYPE type, int flags)
+int OSBDB::opendb(DBTYPE type, int flags, bool new_env)
{
+ // BDB transactions require an environment.
+ if (g_conf.bdbstore_transactional)
+ {
+ env = new DbEnv (DB_CXX_NO_EXCEPTIONS);
+ env->set_error_stream (&std::cerr);
+ env->set_message_stream (&std::cout);
+ env->set_flags (DB_LOG_INMEMORY, 1);
+ //env->set_flags (DB_DIRECT_DB, 1);
+ int env_flags = (DB_CREATE
+ | DB_THREAD
+ | DB_INIT_LOCK
+ | DB_INIT_MPOOL
+ | DB_INIT_TXN
+ | DB_INIT_LOG
+ | DB_PRIVATE);
+ //if (new_env)
+ // env->remove (env_dir.c_str(), 0);
+ if (env->open (NULL, env_flags, 0) != 0)
+ {
+ std::cerr << "failed to open environment " << std::endl;
+ return -EIO;
+ }
+
+ }
+
db = new Db(env, 0);
db->set_error_stream (&std::cerr);
db->set_message_stream (&std::cout);
db->set_cachesize (0, g_conf.bdbstore_cachesize, 0);
}
+ flags = flags | DB_THREAD;
+ if (transactional)
+ flags = flags | DB_AUTO_COMMIT;
+
int ret;
if ((ret = db->open (NULL, device.c_str(), NULL, type, flags, 0)) != 0)
{
dout(2) << "mount " << device << endl;
if (mounted)
- return 0;
+ {
+ dout(4) << "..already mounted" << endl;
+ return 0;
+ }
if (!opened)
{
int ret;
if ((ret = opendb ()) != 0)
- return ret;
+ {
+ dout(4) << "..returns " << ret << endl;
+ return ret;
+ }
}
// XXX Do we want anything else in the superblock?
value.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
if (db->get (NULL, &key, &value, 0) != 0)
- return -EINVAL; // XXX how to say "badly formed fs?"
+ {
+ dout(4) << "..get superblock fails" << endl;
+ return -EINVAL; // XXX how to say "badly formed fs?"
+ }
- dout(2) << ".mount " << super << endl;
+ dout(3) << ".mount " << super << endl;
if (super.version != OSBDB_THIS_VERSION)
- return -EINVAL;
+ {
+ dout(4) << "version mismatch (" << super.version << ")" << endl;
+ return -EINVAL;
+ }
DBTYPE t;
db->get_type (&t);
db->get_flags (&flags);
dout(1) << "mounted version " << OSBDB_THIS_VERSION << "; Btree; "
<< "min keys per page: " << minkey << "; flags: "
- << hex << flags << endl;
+ << hex << flags << dec << endl;
cout << dec;
}
else
dout(1) << "mounted version " << OSBDB_THIS_VERSION << "; Hash; "
<< "fill factor: " << ffactor
<< " table size: " << nelem << "; flags: "
- << hex << flags << endl;
+ << hex << flags << dec << endl;
cout << dec;
}
mounted = true;
+ dout(4) << "..mounted" << endl;
return 0;
}
{
if (!mounted)
return -EINVAL;
- sync();
+
+ dout(2) << "umount" << endl;
+
int ret;
if (opened)
{
+ if (transactional)
+ {
+ env->log_flush (NULL);
+ if ((ret = env->lsn_reset (device.c_str(), 0)) != 0)
+ {
+ derr(1) << "lsn_reset: " << db_strerror (ret) << endl;
+ }
+ }
+
+ db->sync (0);
+
if ((ret = db->close (0)) != 0)
{
derr(1) << "close: " << db_strerror(ret) << endl;
}
delete db;
db = NULL;
+
+ if (env)
+ {
+ env->close (0);
+ delete env;
+ env = NULL;
+ }
}
mounted = false;
opened = false;
+ dout(4) << "..unmounted" << endl;
return 0;
}
dout(2) << "mkfs" << endl;
unlink (device.c_str());
+
int ret;
- if ((ret = opendb((g_conf.bdbstore_btree ? DB_BTREE : DB_HASH), DB_CREATE)) != 0)
+ if ((ret = opendb((g_conf.bdbstore_btree ? DB_BTREE : DB_HASH),
+ DB_CREATE, true)) != 0)
{
derr(1) << "failed to open database: " << device << ": "
- << strerror(ret) << std::endl;
+ << db_strerror(ret) << std::endl;
return -EINVAL;
}
opened = true;
ret = db->truncate (NULL, &c, 0);
if (ret != 0)
{
+ derr(1) << "db truncate failed: " << db_strerror (ret) << endl;
return -EIO; // ???
}
Dbt value (&sb, sizeof (sb));
dout(3) << "..writing superblock" << endl;
- if (db->put (NULL, &key, &value, 0) != 0)
+ if ((ret = db->put (NULL, &key, &value, 0)) != 0)
{
- return -EIO; // ???
+ derr(1) << "failed to write superblock: " << db_strerror (ret)
+ << endl;
+ return -EIO;
}
dout(3) << "..wrote superblock" << endl;
-
+ dout(4) << "..mkfs done" << endl;
return 0;
}
int OSBDB::pick_object_revision_lt(object_t& oid)
{
- if (!mounted)
- return -EINVAL;
-
- // XXX this is pretty lame. Can we do better?
- assert(oid.rev > 0);
- oid.rev--;
- while (oid.rev > 0)
- {
- if (exists (oid))
- {
- return 0;
- }
- oid.rev--;
- }
- return -EEXIST; // FIXME
+ // Not really needed.
+ dout(0) << "pick_object_revision_lt " << oid << endl;
+ return -ENOSYS;
}
bool OSBDB::exists(object_t oid)
{
dout(2) << "exists " << oid << endl;
struct stat st;
- return (stat (oid, &st) == 0);
+ bool ret = (stat (oid, &st) == 0);
+ dout(4) << "..returns " << ret << endl;
+ return ret;
}
int OSBDB::statfs (struct statfs *st)
{
- return -ENOSYS;
+ // Hacky?
+ if (::statfs (device.c_str(), st) != 0)
+ {
+ int ret = -errno;
+ derr(1) << "statfs returns " << ret << endl;
+ return ret;
+ }
+ st->f_type = OSBDB_MAGIC;
+ dout(4) << "..statfs OK" << endl;
+ return 0;
}
int OSBDB::stat(object_t oid, struct stat *st)
{
if (!mounted)
- return -EINVAL;
+ {
+ dout(4) << "not mounted!" << endl;
+ return -EINVAL;
+ }
dout(2) << "stat " << oid << endl;
st->st_size = obj.length;
dout(3) << "stat length:" << obj.length << endl;
+ dout(4) << "..stat OK" << endl;
return 0;
}
int OSBDB::remove(object_t oid, Context *onsafe)
{
if (!mounted)
- return -EINVAL;
+ {
+ derr(1) << "not mounted!" << endl;
+ return -EINVAL;
+ }
dout(2) << "remove " << oid << endl;
+ DbTxn *txn = NULL;
+ if (transactional)
+ env->txn_begin (NULL, &txn, 0);
+
oid_t id;
mkoid(id, oid);
Dbt key (&id, sizeof (oid_t));
db->del (NULL, &key, 0);
+
object_inode_key _ikey = new_object_inode_key (oid);
Dbt ikey (&_ikey, sizeof_object_inode_key());
- db->del (NULL, &ikey, 0);
+ db->del (txn, &ikey, 0);
attrs_id aids = new_attrs_id (oid);
Dbt askey (&aids, sizeof_attrs_id());
Dbt asval;
asval.set_flags (DB_DBT_MALLOC);
- if (db->get (NULL, &askey, &asval, 0) == 0)
+ if (db->get (txn, &askey, &asval, 0) == 0)
{
// We have attributes; remove them.
stored_attrs *sap = (stored_attrs *) asval.get_data();
{
attr_id aid = new_attr_id (oid, sap->names[i].name);
Dbt akey (&aid, sizeof (aid));
- db->del (NULL, &akey, 0);
+ db->del (txn, &akey, 0);
}
- db->del (NULL, &askey, 0);
+ db->del (txn, &askey, 0);
}
+ // XXX check del return value
+
+ if (txn)
+ txn->commit (0);
+ dout(4) << "..remove OK" << endl;
return 0;
}
int OSBDB::truncate(object_t oid, off_t size, Context *onsafe)
{
if (!mounted)
- return -EINVAL;
+ {
+ derr(1) << "not mounted!" << endl;
+ return -EINVAL;
+ }
dout(2) << "truncate " << size << endl;
if (size > 0xFFFFFFFF)
- return -ENOSPC;
+ {
+ derr(1) << "object size too big!" << endl;
+ return -ENOSPC;
+ }
+
+ DbTxn *txn = NULL;
+
+ if (transactional)
+ env->txn_begin (NULL, &txn, 0);
object_inode_key ikey = new_object_inode_key (oid);
stored_object obj;
value.set_ulen (sizeof (obj));
value.set_flags (DB_DBT_USERMEM);
- if (db->get (NULL, &key, &value, 0) != 0)
- return -ENOENT;
+ if (db->get (txn, &key, &value, 0) != 0)
+ {
+ if (txn)
+ txn->abort();
+ dout(4) << "..returns -ENOENT" << endl;
+ return -ENOENT;
+ }
if (obj.length < size)
{
newVal.set_dlen (1);
newVal.set_ulen (1);
newVal.set_flags (DB_DBT_PARTIAL);
- if (db->put (NULL, &okey, &newVal, 0) != 0)
- return -EIO;
+ if (db->put (txn, &okey, &newVal, 0) != 0)
+ {
+ if (txn)
+ txn->abort();
+ derr(1) << ".updating object failed" << endl;
+ return -EIO;
+ }
obj.length = size;
value.set_ulen (sizeof (obj));
- if (db->put (NULL, &key, &value, 0) != 0)
- return -EIO;
+ if (db->put (txn, &key, &value, 0) != 0)
+ {
+ if (txn)
+ txn->abort();
+ derr(1) << ".updating object info failed" << endl;
+ return -EIO;
+ }
}
else if (obj.length > size)
{
Dbt tval (&obj, sizeof (obj));
tval.set_ulen (sizeof (obj));
tval.set_flags (DB_DBT_USERMEM);
- if (db->put (NULL, &key, &tval, 0) != 0)
- return -EIO;
+ if (db->put (txn, &key, &tval, 0) != 0)
+ {
+ if (txn)
+ txn->abort();
+ derr(1) << ".updating object info failed" << endl;
+ return -EIO;
+ }
if (size == 0)
{
char x[1];
mkoid (id, oid);
Dbt okey (&id, sizeof (oid_t));
Dbt oval (&x, 0);
- if (db->put (NULL, &okey, &oval, 0) != 0)
- return -EIO;
+ if (db->put (txn, &okey, &oval, 0) != 0)
+ {
+ if (txn)
+ txn->abort();
+ derr(1) << ".updating object failed" << endl;
+ return -EIO;
+ }
}
else
{
Dbt okey (&id, sizeof (oid_t));
Dbt oval;
oval.set_flags (DB_DBT_MALLOC);
- if (db->get (NULL, &okey, &oval, 0) != 0)
- return -EIO;
+ if (db->get (txn, &okey, &oval, 0) != 0)
+ {
+ if (txn)
+ txn->abort();
+ derr(1) << ".getting old object failed" << endl;
+ return -EIO;
+ }
auto_ptr<char> ovalPtr ((char *) oval.get_data());
oval.set_size ((size_t) size);
oval.set_ulen ((size_t) size);
- if (db->put (NULL, &okey, &oval, 0) != 0)
- return -EIO;
+ if (db->put (txn, &okey, &oval, 0) != 0)
+ {
+ if (txn)
+ txn->abort();
+ derr(1) << ".putting new object failed" << endl;
+ return -EIO;
+ }
}
}
+ if (txn)
+ txn->commit (0);
+
+ dout(4) << "..truncate OK" << endl;
return 0;
}
int OSBDB::read(object_t oid, off_t offset, size_t len, bufferlist& bl)
{
if (!mounted)
- return -EINVAL;
+ {
+ derr(1) << "not mounted!" << endl;
+ return -EINVAL;
+ }
dout(2) << "read " << oid << " " << offset << " "
<< len << endl;
DbTxn *txn = NULL;
- //env->txn_begin (NULL, &txn, 0);
+ if (transactional)
+ env->txn_begin (NULL, &txn, 0);
object_inode_key _ikey = new_object_inode_key (oid);
stored_object obj;
int ret;
if ((ret = db->get (txn, &ikey, &ival, 0)) != 0)
{
- //txn->abort();
+ if (txn)
+ txn->abort();
derr(1) << "get returned " << db_strerror (ret) << endl;
return -ENOENT;
}
if ((ret = db->get (txn, &key, &value, 0)) != 0)
{
derr(1) << " get returned " << db_strerror (ret) << endl;
- //txn->abort();
+ if (txn)
+ txn->abort();
return -EIO;
}
}
else
{
if (offset > obj.length)
- return 0;
+ {
+ dout(2) << "..offset out of range" << endl;
+ return 0;
+ }
if (offset + len > obj.length)
len = obj.length - (size_t) offset;
dout(3) << " doing partial read of " << len << endl;
value.set_ulen (len);
value.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
dout(3) << " getting " << oid << endl;
- if ((ret = db->get (NULL, &key, &value, 0)) != 0)
+ if ((ret = db->get (txn, &key, &value, 0)) != 0)
{
derr(1) << "get returned " << db_strerror (ret) << endl;
- //txn->abort();
+ if (txn)
+ txn->abort();
return -EIO;
}
}
- //txn->commit (0);
+ if (txn)
+ txn->commit (0);
+ dout(4) << "..read OK, returning " << len << endl;
return len;
}
bufferlist& bl, Context *onsafe)
{
if (!mounted)
- return -EINVAL;
+ {
+ derr(1) << "not mounted!" << endl;
+ return -EINVAL;
+ }
dout(2) << "write " << oid << " " << offset << " "
<< len << endl;
if (offset > 0xFFFFFFFFL || offset + len > 0xFFFFFFFFL)
- return -ENOSPC;
+ {
+ derr(1) << "object too big" << endl;
+ return -ENOSPC;
+ }
DbTxn *txn = NULL;
- //env->txn_begin (NULL, &txn, 0);
+ if (transactional)
+ env->txn_begin (txn, &txn, 0);
object_inode_key _ikey = new_object_inode_key (oid);
stored_object obj;
ival.set_flags (DB_DBT_USERMEM);
int ret;
- dout(3) << " getting " << _ikey << endl;
+ dout(3) << "..getting " << _ikey << endl;
if (db->get (txn, &ikey, &ival, 0) != 0)
{
- dout(3) << " writing new object" << endl;
+ dout(3) << "..writing new object" << endl;
// New object.
obj.length = (size_t) offset + len;
- dout(3) << " mapping " << _ikey << " => "
+ dout(3) << "..mapping " << _ikey << " => "
<< obj << endl;
if ((ret = db->put (txn, &ikey, &ival, 0)) != 0)
{
- derr(1) << " put returned " << db_strerror (ret) << endl;
+ derr(1) << "..put returned " << db_strerror (ret) << endl;
+ if (txn)
+ txn->abort();
return -EIO;
}
value.set_doff ((size_t) offset);
value.set_dlen (len);
}
- dout(3) << " mapping " << oid << " => ("
+ dout(3) << "..mapping " << oid << " => ("
<< obj.length << " bytes)" << endl;
if ((ret = db->put (txn, &key, &value, 0)) != 0)
{
- derr(1) << " put returned " << db_strerror (ret) << endl;
+ derr(1) << "..put returned " << db_strerror (ret) << endl;
+ if (txn)
+ txn->abort();
return -EIO;
}
+
+ if (txn)
+ txn->commit (0);
+
+ dout(4) << "..write OK, returning " << len << endl;
return len;
}
obj.length = len;
if ((ret = db->put (txn, &ikey, &ival, 0)) != 0)
{
- derr(1) << " put returned " << db_strerror (ret) << endl;
+ derr(1) << " put returned " << db_strerror (ret) << endl;
+ if (txn)
+ txn->abort();
return -EIO;
}
}
Dbt value (bl.c_str(), len);
if (db->put (txn, &key, &value, 0) != 0)
{
+ if (txn)
+ txn->abort();
+ derr(1) << "..writing object failed!" << endl;
return -EIO;
}
}
if (offset + len > obj.length)
{
obj.length = (size_t) offset + len;
- if (db->put (NULL, &ikey, &ival, 0) != 0)
+ if (db->put (txn, &ikey, &ival, 0) != 0)
{
+ if (txn)
+ txn->abort();
+ derr(1) << "..writing object info failed!" << endl;
return -EIO;
}
}
value.set_dlen (len);
value.set_ulen (len);
value.set_flags (DB_DBT_PARTIAL);
- if (db->put (NULL, &key, &value, 0) != 0)
+ if (db->put (txn, &key, &value, 0) != 0)
{
+ if (txn)
+ txn->abort();
+ derr(1) << "..writing object failed!" << endl;
return -EIO;
}
}
+ if (txn)
+ txn->commit (0);
+
+ dout(4) << "..write OK, returning " << len << endl;
return len;
}
int OSBDB::clone(object_t oid, object_t noid)
{
if (!mounted)
- return -EINVAL;
+ {
+ derr(1) << "not mounted!" << endl;
+ return -EINVAL;
+ }
dout(2) << "clone " << oid << ", " << noid << endl;
if (exists (noid))
- return -EEXIST;
+ {
+ dout(4) << "..target exists; returning -EEXIST" << endl;
+ return -EEXIST;
+ }
+
+ DbTxn *txn = NULL;
+ if (transactional)
+ env->txn_begin (NULL, &txn, 0);
object_inode_key _ikey = new_object_inode_key (oid);
object_inode_key _nikey = new_object_inode_key (noid);
Dbt value;
value.set_flags (DB_DBT_MALLOC);
- if (db->get (NULL, &ikey, &ival, 0) != 0)
- return -ENOENT;
- if (db->get (NULL, &key, &value, 0) != 0)
- return -ENOENT;
+ if (db->get (txn, &ikey, &ival, 0) != 0)
+ {
+ if (txn)
+ txn->abort();
+ derr(1) << "..getting object info failed!" << endl;
+ return -ENOENT;
+ }
+ if (db->get (txn, &key, &value, 0) != 0)
+ {
+ if (txn)
+ txn->abort();
+ derr(1) << "..getting original object failed" << endl;
+ return -ENOENT;
+ }
auto_ptr<char> valueptr ((char *) value.get_data());
- if (db->put (NULL, &nikey, &ival, 0) != 0)
- return -EIO;
- if (db->put (NULL, &nkey, &value, 0) != 0)
- return -EIO;
+ if (db->put (txn, &nikey, &ival, 0) != 0)
+ {
+ if (txn)
+ txn->abort();
+ derr(1) << "..putting object info failed" << endl;
+ return -EIO;
+ }
+ if (db->put (txn, &nkey, &value, 0) != 0)
+ {
+ if (txn)
+ txn->abort();
+ derr(1) << "..putting new object failed" << endl;
+ return -EIO;
+ }
+
+ if (txn)
+ txn->commit (0);
+ dout(4) << "..clone OK" << endl;
return 0;
}
int OSBDB::list_collections(list<coll_t>& ls)
{
if (!mounted)
- return -EINVAL;
+ {
+ derr(1) << "not mounted!" << endl;
+ return -EINVAL;
+ }
dout(2) << "list_collections" << endl;
value.set_flags (DB_DBT_MALLOC);
if (db->get (NULL, &key, &value, 0) != 0)
- return 0; // no collections.
+ {
+ dout(4) << "..no collections" << endl;
+ return 0; // no collections.
+ }
auto_ptr<stored_colls> sc ((stored_colls *) value.get_data());
stored_colls *scp = sc.get();
for (uint32_t i = 0; i < sc->count; i++)
ls.push_back (scp->colls[i]);
+ dout(4) << "..list_collections returns " << scp->count << endl;
return scp->count;
}
int OSBDB::create_collection(coll_t c, Context *onsafe)
{
if (!mounted)
- return -EINVAL;
+ {
+ derr(1) << "not mounted" << endl;
+ return -EINVAL;
+ }
- dout(2) << "create_collection " << c << endl;
+ dout(2) << "create_collection " << hex << c << dec << endl;
Dbt key (COLLECTIONS_KEY, 1);
Dbt value;
value.set_flags (DB_DBT_MALLOC);
+ DbTxn *txn = NULL;
+ if (transactional)
+ env->txn_begin (NULL, &txn, 0);
+
stored_colls *scp = NULL;
size_t sz = 0;
bool created = false;
- if (db->get (NULL, &key, &value, 0) != 0)
+ if (db->get (txn, &key, &value, 0) != 0)
{
sz = sizeof (stored_colls) + sizeof (coll_t);
scp = (stored_colls *) malloc (sz);
if (scp->count > 0)
ins = binary_search<coll_t> (scp->colls, scp->count, c);
if (scp->colls[ins] == c)
- return -EEXIST;
+ {
+ if (txn != NULL)
+ txn->abort();
+ derr(1) << ".collection " << c << " already exists " << endl;
+ return -EEXIST;
+ }
dout(3) << "..insertion point: " << ins << endl;
// Put the modified collection list back.
{
Dbt value2 (scp, sz);
- if (db->put (NULL, &key, &value2, 0) != 0)
+ if (db->put (txn, &key, &value2, 0) != 0)
{
+ if (txn != NULL)
+ txn->abort();
+ derr(1) << ".writing new collections list failed" << endl;
return -EIO;
}
}
new_coll.count = 0;
Dbt coll_key (&c, sizeof (coll_t));
Dbt coll_value (&new_coll, sizeof (stored_coll));
- if (db->put (NULL, &coll_key, &coll_value, 0) != 0)
+ if (db->put (txn, &coll_key, &coll_value, 0) != 0)
{
+ if (txn != NULL)
+ txn->abort();
+ derr(1) << ".writing new collection failed" << endl;
return -EIO;
}
}
+ if (txn != NULL)
+ txn->commit (0);
+
+ dout(4) << "..create_collection OK" << endl;
return 0;
}
int OSBDB::destroy_collection(coll_t c, Context *onsafe)
{
if (!mounted)
- return -EINVAL;
+ {
+ derr(1) << "not mounted" << endl;
+ return -EINVAL;
+ }
- dout(2) << "destroy_collection " << c << endl;
+ dout(2) << "destroy_collection " << hex << c << dec << endl;
Dbt key (COLLECTIONS_KEY, 1);
Dbt value;
value.set_flags (DB_DBT_MALLOC);
+ DbTxn *txn = NULL;
- if (db->get (NULL, &key, &value, 0) != 0)
+ if (transactional)
+ env->txn_begin (NULL, &txn, 0);
+
+ if (db->get (txn, &key, &value, 0) != 0)
{
+ if (txn != NULL)
+ txn->abort();
+ derr(1) << ".collection list doesn't exist" << endl;
return -ENOENT; // XXX
}
auto_ptr<stored_colls> valueBuf (scp);
if (scp->count == 0)
{
+ if (txn != NULL)
+ txn->abort();
+ derr(1) << ".collection " << c << " not listed" << endl;
return -ENOENT;
}
uint32_t ins = binary_search<coll_t> (scp->colls, scp->count, c);
+ dout(4) << "..insertion point is " << ins << endl;
if (scp->colls[ins] != c)
{
+ if (txn != NULL)
+ txn->abort();
+ derr(1) << ".collection " << c << " not listed" << endl;
return -ENOENT;
}
+ dout(4) << "..collections list is " << scp << endl;
+
// Move the rest of the list down in memory, if needed.
- if (ins < scp->count - 1)
+ if (ins < scp->count)
{
size_t n = scp->count - ins - 1;
+ dout(4) << "..shift list down " << n << endl;
memmove (&scp->colls[ins], &scp->colls[ins + 1], n);
}
+ dout(4) << "..collections list is " << scp << endl;
+
// Modify the record size to be one less.
Dbt nvalue (scp, value.get_size() - sizeof (coll_t));
nvalue.set_flags (DB_DBT_USERMEM);
- if (db->put (NULL, &key, &nvalue, 0) != 0)
+ if (db->put (txn, &key, &nvalue, 0) != 0)
{
+ if (txn != NULL)
+ txn->abort();
+ derr(1) << ".putting modified collection list failed" << endl;
return -EIO;
}
// Delete the collection.
Dbt collKey (&c, sizeof (coll_t));
- if (db->del (NULL, &collKey, 0) != 0)
+ if (db->del (txn, &collKey, 0) != 0)
{
+ if (txn != NULL)
+ txn->abort();
+ derr(1) << ".deleting collection failed" << endl;
return -EIO;
}
+ if (txn != NULL)
+ txn->commit (0);
+ dout(4) << "..destroy_collection OK" << endl;
return 0;
}
bool OSBDB::collection_exists(coll_t c)
{
if (!mounted)
- return -EINVAL;
+ {
+ derr(1) << "not mounted" << endl;
+ return -EINVAL;
+ }
- dout(2) << "collection_exists " << c << endl;
+ dout(2) << "collection_exists " << hex << c << dec << endl;
- Dbt key (COLLECTIONS_KEY, 1);
+ /*Dbt key (COLLECTIONS_KEY, 1);
Dbt value;
value.set_flags (DB_DBT_MALLOC);
if (db->get (NULL, &key, &value, 0) != 0)
- return false;
+ {
+ dout(4) << "..no collection list; return false" << endl;
+ return false;
+ }
stored_colls *scp = (stored_colls *) value.get_data();
auto_ptr<stored_colls> sc (scp);
+ dout(5) << "..collection list is " << scp << endl;
if (scp->count == 0)
- return false;
+ {
+ dout(4) << "..empty collection list; return false" << endl;
+ return false;
+ }
uint32_t ins = binary_search<coll_t> (scp->colls, scp->count, c);
+ dout(4) << "..insertion point is " << ins << endl;
- return (scp->colls[ins] == c);
+ int ret = (scp->colls[ins] == c);
+ dout(4) << "..returns " << ret << endl;
+ return ret;*/
+
+ Dbt key (&c, sizeof (coll_t));
+ Dbt value;
+ value.set_flags (DB_DBT_MALLOC);
+ if (db->get (NULL, &key, &value, 0) != 0)
+ {
+ dout(4) << "..no collection, return false" << endl;
+ return false;
+ }
+ void *val = value.get_data();
+ free (val);
+ dout(4) << "..collection exists; return true" << endl;
+ return true;
}
int OSBDB::collection_stat(coll_t c, struct stat *st)
{
if (!mounted)
- return -EINVAL;
+ {
+ derr(1) << "not mounted" << endl;
+ return -EINVAL;
+ }
dout(2) << "collection_stat " << c << endl;
+ // XXX is this needed?
return -ENOSYS;
}
int OSBDB::collection_add(coll_t c, object_t o, Context *onsafe)
{
if (!mounted)
- return -EINVAL;
+ {
+ dout(2) << "not mounted" << endl;
+ return -EINVAL;
+ }
- dout(2) << "collection_add " << c << " " << o << endl;
+ dout(2) << "collection_add " << hex << c << dec << " " << o << endl;
Dbt key (&c, sizeof (coll_t));
Dbt value;
value.set_flags (DB_DBT_MALLOC);
+ DbTxn *txn = NULL;
- if (db->get (NULL, &key, &value, 0) != 0)
+ if (transactional)
+ env->txn_begin (NULL, &txn, 0);
+
+ if (db->get (txn, &key, &value, 0) != 0)
{
+ if (txn != NULL)
+ txn->abort();
+ derr(1) << "failed to find collection" << endl;
return -ENOENT;
}
// Already there?
if (scp->objects[ins] == o)
{
+ if (txn != NULL)
+ txn->abort();
+ derr(1) << "collection already has object" << endl;
return -EEXIST;
}
}
scp = (stored_coll *) realloc (scp, sz);
sc.release();
sc.reset (scp);
- if (ins < scp->count)
+ dout(3) << "..current collection: " << scp << endl;
+ if (ins < scp->count - 1)
{
size_t n = (scp->count - ins) * sizeof (object_t);
+ dout(3) << "..move up " << n << " bytes" << endl;
memmove (&scp->objects[ins + 1], &scp->objects[ins], n);
}
scp->count++;
dout(3) << "..collection: " << scp << endl;
Dbt nvalue (scp, sz);
- if (db->put (NULL, &key, &nvalue, 0) != 0)
+ if (db->put (txn, &key, &nvalue, 0) != 0)
{
+ if (txn != NULL)
+ txn->abort();
+ derr(1) << "..putting modified collection failed" << endl;
return -EIO;
}
+ if (txn != NULL)
+ txn->commit (0);
+ dout(4) << "..collection add OK" << endl;
return 0;
}
int OSBDB::collection_remove(coll_t c, object_t o, Context *onsafe)
{
if (!mounted)
- return -EINVAL;
+ {
+ derr(1) << "not mounted" << endl;
+ return -EINVAL;
+ }
- dout(2) << "collection_remove " << c << " " << o << endl;
+ dout(2) << "collection_remove " << hex << c << dec << " " << o << endl;
Dbt key (&c, sizeof (coll_t));
Dbt value;
value.set_flags (DB_DBT_MALLOC);
+ DbTxn *txn = NULL;
+
+ if (transactional)
+ env->txn_begin (NULL, &txn, 0);
- if (db->get (NULL, &key, &value, 0) != 0)
+ if (db->get (txn, &key, &value, 0) != 0)
{
+ if (txn != NULL)
+ txn->abort();
+ dout(1) << "..collection doesn't exist" << endl;
return -ENOENT;
}
stored_coll *scp = (stored_coll *) value.get_data();
auto_ptr<stored_coll> sc (scp);
+ dout(5) << "..collection is " << scp << endl;
if (scp->count == 0)
{
+ if (txn != NULL)
+ txn->abort();
+ dout(1) << "..collection is empty" << endl;
return -ENOENT;
}
uint32_t ins = binary_search<object_t> (scp->objects, scp->count, o);
+ dout(4) << "..insertion point is " << ins << endl;
if (scp->objects[ins] != o)
{
+ if (txn != NULL)
+ txn->abort();
+ dout(1) << "..object not in collection" << endl;
return -ENOENT;
}
if (ins < scp->count - 1)
{
size_t n = (scp->count - ins - 1) * sizeof (object_t);
+ dout(5) << "..moving " << n << " bytes down" << endl;
memmove (&scp->objects[ins], &scp->objects[ins + 1], n);
}
scp->count--;
dout(3) << "..collection " << scp << endl;
Dbt nval (scp, value.get_size() - sizeof (object_t));
- if (db->put (NULL, &key, &nval, 0) != 0)
+ if (db->put (txn, &key, &nval, 0) != 0)
{
+ if (txn != NULL)
+ txn->abort();
+ derr(1) << "..putting modified collection failed" << endl;
return -EIO;
}
+ if (txn != NULL)
+ txn->commit (0);
+ dout(4) << "..collection remove OK" << endl;
return 0;
}
int OSBDB::collection_list(coll_t c, list<object_t>& o)
{
if (!mounted)
- return -EINVAL;
+ {
+ derr(1) << "not mounted" << endl;
+ return -EINVAL;
+ }
Dbt key (&c, sizeof (coll_t));
Dbt value;
- if (db->get (NULL, &key, &value, 0) != 0)
- return -ENOENT;
+ DbTxn *txn = NULL;
+
+ if (transactional)
+ env->txn_begin (NULL, &txn, 0);
+
+ if (db->get (txn, &key, &value, 0) != 0)
+ {
+ if (txn != NULL)
+ txn->abort();
+ return -ENOENT;
+ }
stored_coll *scp = (stored_coll *) value.get_data();
auto_ptr<stored_coll> sc (scp);
for (uint32_t i = 0; i < scp->count; i++)
o.push_back (scp->objects[i]);
+ if (txn != NULL)
+ txn->commit (0);
return 0;
}
\f // Attributes
int OSBDB::_setattr(object_t oid, const char *name,
- const void *value, size_t size, Context *onsafe)
+ const void *value, size_t size, Context *onsafe,
+ DbTxn *txn)
{
if (!mounted)
return -EINVAL;
size_t sz = 0;
dout(3) << " getting " << aids << endl;
- if (db->get (NULL, &attrs_key, &attrs_val, 0) != 0)
+ if (db->get (txn, &attrs_key, &attrs_val, 0) != 0)
{
dout(2) << " first attribute" << endl;
sz = sizeof (stored_attrs);
{
sz = attrs_val.get_size();
sap = (stored_attrs *) attrs_val.get_data();
- dout(2) << " add to list of " << sap->count << " attrs" << endl;
+ dout(2) << "..add to list of " << sap->count << " attrs" << endl;
}
auto_ptr<stored_attrs> sa (sap);
int ins = 0;
if (sap->count > 0)
ins = binary_search<attr_name> (sap->names, sap->count, _name);
- dout(3) << " insertion point is " << ins << endl;
+ dout(3) << "..insertion point is " << ins << endl;
if (sap->count == 0 || strcmp (sap->names[ins].name, name) != 0)
{
sz += sizeof (attr_name);
- dout(3) << " realloc 0x" << hex << ((void *) sap) << " to "
+ dout(3) << "..realloc " << ((void *) sap) << " to "
<< dec << sz << endl;
sap = (stored_attrs *) realloc (sap, sz);
- dout(3) << " returns 0x" << hex << ((void *) sap) << endl;
+ dout(3) << "..returns " << ((void *) sap) << endl;
sa.release ();
sa.reset (sap);
int n = (sap->count - ins) * sizeof (attr_name);
if (n > 0)
{
- dout(3) << " move " << n << " bytes from 0x"
+ dout(3) << "..move " << n << " bytes from 0x"
<< hex << (&sap->names[ins]) << " to 0x"
- << hex << (&sap->names[ins+1]) << endl;
+ << hex << (&sap->names[ins+1]) << dec << endl;
memmove (&sap->names[ins+1], &sap->names[ins], n);
}
memset (&sap->names[ins], 0, sizeof (attr_name));
Dbt newAttrs_val (sap, sz);
newAttrs_val.set_ulen (sz);
newAttrs_val.set_flags (DB_DBT_USERMEM);
- dout(3) << " putting " << aids << endl;
- if (db->put (NULL, &attrs_key, &newAttrs_val, 0) != 0)
- return -EIO;
+ dout(3) << "..putting " << aids << endl;
+ if (db->put (txn, &attrs_key, &newAttrs_val, 0) != 0)
+ {
+ derr(1) << ".writing attributes list failed" << endl;
+ return -EIO;
+ }
}
else
{
- dout(3) << " attribute " << name << " already exists" << endl;
+ dout(3) << "..attribute " << name << " already exists" << endl;
}
- dout(3) << " attributes list: " << sap << endl;
+ dout(5) << "..attributes list: " << sap << endl;
// Add the attribute.
attr_id aid = new_attr_id (oid, name);
Dbt attr_key (&aid, sizeof (aid));
Dbt attr_val ((void *) value, size);
- dout(3) << " writing attribute key " << aid << endl;
- if (db->put (NULL, &attr_key, &attr_val, 0) != 0)
- return -EIO;
+ dout(3) << "..writing attribute key " << aid << endl;
+ if (db->put (txn, &attr_key, &attr_val, 0) != 0)
+ {
+ derr(1) << ".writing attribute key failed" << endl;
+ return -EIO;
+ }
+ dout(4) << "..setattr OK" << endl;
return 0;
}
if (!mounted)
return -EINVAL;
+ DbTxn *txn = NULL;
+ if (transactional)
+ env->txn_begin (NULL, &txn, 0);
+
dout(2) << "setattr " << oid << ":" << name << " => ("
<< size << " bytes)" << endl;
- int ret = _setattr (oid, name, value, size, onsafe);
+ int ret = _setattr (oid, name, value, size, onsafe, txn);
+ if (ret == 0)
+ {
+ if (txn != NULL)
+ txn->commit (0);
+ }
+ else
+ {
+ if (txn != NULL)
+ txn->abort();
+ }
return ret;
}
if (!mounted)
return -EINVAL;
+ DbTxn *txn = NULL;
+
+ if (transactional)
+ env->txn_begin (NULL, &txn, 0);
+
map<string,bufferptr>::iterator it;
for (it = aset.begin(); it != aset.end(); it++)
{
string name = it->first;
bufferptr value = it->second;
int ret = _setattr (oid, name.c_str(), value.c_str(),
- value.length(), onsafe);
+ value.length(), onsafe, txn);
if (ret != 0)
{
+ if (txn != NULL)
+ txn->abort();
return ret;
}
}
+
+ if (txn != NULL)
+ txn->commit (0);
return 0;
}
if (!mounted)
return -EINVAL;
+ dout(2) << "_getattr " << oid << " " << name << " " << size << endl;
+
attr_id aid = new_attr_id (oid, name);
Dbt key (&aid, sizeof (aid));
Dbt val (value, size);
val.set_ulen (size);
+ val.set_doff (0);
+ val.set_dlen (size);
val.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
if (db->get (NULL, &key, &val, 0) != 0)
{
+ derr(1) << ".getting value failed" << endl;
return -ENOENT;
}
+ dout(4) << ".._getattr OK; returns " << val.get_size() << endl;
return val.get_size();
}
if (!mounted)
return -EINVAL;
- int count = 0;
for (map<string,bufferptr>::iterator it = aset.begin();
it != aset.end(); it++)
{
(*it).second.length());
if (ret < 0)
return ret;
- count += ret;
}
- return count;
+ return 0;
}
int OSBDB::rmattr(object_t oid, const char *name, Context *onsafe)
{
if (!mounted)
return -EINVAL;
+
+ dout(2) << "rmattr " << oid << " " << name << endl;
+
attrs_id aids = new_attrs_id (oid);
Dbt askey (&aids, sizeof_attrs_id());
Dbt asvalue;
asvalue.set_flags (DB_DBT_MALLOC);
- if (db->get (NULL, &askey, &asvalue, 0) != 0)
- return -ENOENT;
+ DbTxn *txn = NULL;
+
+ if (transactional)
+ env->txn_begin (NULL, &txn, 0);
+
+ if (db->get (txn, &askey, &asvalue, 0) != 0)
+ {
+ if (txn != NULL)
+ txn->abort();
+ return -ENOENT;
+ }
stored_attrs *sap = (stored_attrs *) asvalue.get_data();
auto_ptr<stored_attrs> sa (sap);
+ dout(5) << "..attributes list " << sap << endl;
+
if (sap->count == 0)
- return -ENOENT;
+ {
+ if (txn != NULL)
+ txn->abort();
+ derr(1) << ".empty attribute list" << endl;
+ return -ENOENT;
+ }
attr_name _name;
- memset(&name, 0, sizeof (_name));
+ memset(&_name, 0, sizeof (_name));
strncpy (_name.name, name, OSBDB_MAX_ATTR_LEN);
int ins = binary_search<attr_name> (sap->names, sap->count, _name);
+ dout(4) << "..insertion point is " << ins << endl;
if (strcmp (sap->names[ins].name, name) != 0)
- return -ENOENT;
+ {
+ if (txn != NULL)
+ txn->abort();
+ derr(1) << ".attribute not found in list" << endl;
+ return -ENOENT;
+ }
// Shift the later elements down by one, if needed.
int n = (sap->count - ins) * sizeof (attr_name);
if (n > 0)
- memmove (&(sap->names[ins]), &(sap->names[ins + 1]), n);
+ {
+ dout(4) << "..shift down by " << n << endl;
+ memmove (&(sap->names[ins]), &(sap->names[ins + 1]), n);
+ }
sap->count--;
+ dout(5) << "..attributes list now " << sap << endl;
+
asvalue.set_size(asvalue.get_size() - sizeof (attr_name));
int ret;
- if ((ret = db->put (NULL, &askey, &asvalue, 0)) != 0)
+ if ((ret = db->put (txn, &askey, &asvalue, 0)) != 0)
{
derr(1) << "put stored_attrs " << db_strerror (ret) << endl;
+ if (txn != NULL)
+ txn->abort();
return -EIO;
}
// Remove the attribute.
attr_id aid = new_attr_id (oid, name);
Dbt key (&aid, sizeof (aid));
- if ((ret = db->del (NULL, &key, 0)) != 0)
- derr(1) << "deleting " << aid << ": " << db_strerror(ret) << endl;
+ if ((ret = db->del (txn, &key, 0)) != 0)
+ {
+ derr(1) << "deleting " << aid << ": " << db_strerror(ret) << endl;
+ if (txn != NULL)
+ txn->abort();
+ return -EIO;
+ }
+ if (txn != NULL)
+ txn->commit (0);
+ dout(4) << "..rmattr OK" << endl;
return 0;
}
Dbt value;
value.set_flags (DB_DBT_MALLOC);
+ // XXX Transactions for read atomicity???
+
int ret;
if ((ret = db->get (NULL, &key, &value, 0)) != 0)
{
p[n] = '\0';
p = p + n + 1;
}
+
+ dout(4) << "listattr OK" << endl;
return 0;
}
if (!mounted)
return -EINVAL;
- dout(2) << "collection_setattr" << cid << " " << name
+ dout(2) << "collection_setattr " << hex << cid << dec << " " << name
<< " (" << size << " bytes)" << endl;
if (strlen (name) >= OSBDB_MAX_ATTR_LEN)
- return -ENAMETOOLONG;
+ {
+ derr(1) << "name too long" << endl;
+ return -ENAMETOOLONG;
+ }
// Add name to attribute list, if needed.
coll_attrs_id aids = new_coll_attrs_id (cid);
stored_attrs *sap = NULL;
size_t sz = 0;
+ DbTxn *txn = NULL;
+ if (transactional)
+ env->txn_begin (NULL, &txn, 0);
+
dout(3) << " getting " << aids << endl;
- if (db->get (NULL, &attrs_key, &attrs_val, 0) != 0)
+ if (db->get (txn, &attrs_key, &attrs_val, 0) != 0)
{
dout(2) << " first attribute" << endl;
sz = sizeof (stored_attrs);
if (sap->count == 0 || strcmp (sap->names[ins].name, name) != 0)
{
sz += sizeof (attr_name);
- dout(3) << " realloc 0x" << hex << ((void *) sap) << " to "
+ dout(3) << " realloc " << hex << ((void *) sap) << " to "
<< dec << sz << endl;
sap = (stored_attrs *) realloc (sap, sz);
- dout(3) << " returns 0x" << hex << ((void *) sap) << endl;
+ dout(3) << " returns " << hex << ((void *) sap) << dec << endl;
sa.release ();
sa.reset (sap);
int n = (sap->count - ins) * sizeof (attr_name);
{
dout(3) << " move " << n << " bytes from 0x"
<< hex << (&sap->names[ins]) << " to 0x"
- << hex << (&sap->names[ins+1]) << endl;
+ << hex << (&sap->names[ins+1]) << dec << endl;
memmove (&sap->names[ins+1], &sap->names[ins], n);
}
memset (&sap->names[ins], 0, sizeof (attr_name));
newAttrs_val.set_ulen (sz);
newAttrs_val.set_flags (DB_DBT_USERMEM);
dout(3) << " putting " << aids << endl;
- if (db->put (NULL, &attrs_key, &newAttrs_val, 0) != 0)
- return -EIO;
+ if (db->put (txn, &attrs_key, &newAttrs_val, 0) != 0)
+ {
+ if (txn != NULL)
+ txn->abort();
+ derr(1) << ".putting new attributes failed" << endl;
+ return -EIO;
+ }
}
else
{
- dout(3) << " attribute " << name << " already exists" << endl;
+ dout(3) << "..attribute " << name << " already exists" << endl;
}
- dout(3) << " attributes list: " << sap << endl;
+ dout(3) << "..attributes list: " << sap << endl;
// Add the attribute.
coll_attr_id aid = new_coll_attr_id (cid, name);
Dbt attr_key (&aid, sizeof (aid));
Dbt attr_val ((void *) value, size);
dout(3) << " writing attribute key " << aid << endl;
- if (db->put (NULL, &attr_key, &attr_val, 0) != 0)
- return -EIO;
+ if (db->put (txn, &attr_key, &attr_val, 0) != 0)
+ {
+ if (txn != NULL)
+ txn->abort();
+ derr(1) << ".putting attribute failed" << endl;
+ return -EIO;
+ }
+
+ if (txn != NULL)
+ txn->commit (0);
+ dout(4) << "..collection setattr OK" << endl;
return 0;
}
if (!mounted)
return -EINVAL;
+ dout(2) << "collection_rmattr " << hex << cid << dec
+ << " " << name << endl;
+
coll_attrs_id aids = new_coll_attrs_id (cid);
Dbt askey (&aids, sizeof_coll_attrs_id());
Dbt asvalue;
asvalue.set_flags (DB_DBT_MALLOC);
- if (db->get (NULL, &askey, &asvalue, 0) != 0)
- return -ENOENT;
+ DbTxn *txn = NULL;
+ if (transactional)
+ env->txn_begin (NULL, &txn, 0);
+
+ if (db->get (txn, &askey, &asvalue, 0) != 0)
+ {
+ if (txn != NULL)
+ txn->abort();
+ derr(1) << ".no attributes list" << endl;
+ return -ENOENT;
+ }
stored_attrs *sap = (stored_attrs *) asvalue.get_data();
auto_ptr<stored_attrs> sa (sap);
+ dout(5) << "..attributes list " << sap << endl;
if (sap->count == 0)
- return -ENOENT;
+ {
+ if (txn != NULL)
+ txn->abort();
+ derr(1) << ".empty attributes list" << endl;
+ return -ENOENT;
+ }
attr_name _name;
- memset(&name, 0, sizeof (_name));
+ memset(&_name, 0, sizeof (_name));
strncpy (_name.name, name, OSBDB_MAX_ATTR_LEN);
int ins = binary_search<attr_name> (sap->names, sap->count, _name);
if (strcmp (sap->names[ins].name, name) != 0)
- return -ENOENT;
+ {
+ if (txn != NULL)
+ txn->abort();
+ derr(1) << ".attribute not listed" << endl;
+ return -ENOENT;
+ }
// Shift the later elements down by one, if needed.
int n = (sap->count - ins) * sizeof (attr_name);
if (n > 0)
- memmove (&(sap->names[ins]), &(sap->names[ins + 1]), n);
+ {
+ dout(4) << "..shift down by " << n << endl;
+ memmove (&(sap->names[ins]), &(sap->names[ins + 1]), n);
+ }
sap->count--;
+ dout(5) << "..attributes list now " << sap << endl;
+
asvalue.set_size(asvalue.get_size() - sizeof (attr_name));
int ret;
- if ((ret = db->put (NULL, &askey, &asvalue, 0)) != 0)
+ if ((ret = db->put (txn, &askey, &asvalue, 0)) != 0)
{
derr(1) << "put stored_attrs " << db_strerror (ret) << endl;
+ if (txn != NULL)
+ txn->abort();
return -EIO;
}
// Remove the attribute.
coll_attr_id aid = new_coll_attr_id (cid, name);
Dbt key (&aid, sizeof (aid));
- if ((ret = db->del (NULL, &key, 0)) != 0)
- derr(1) << "deleting " << aid << ": " << db_strerror(ret) << endl;
+ if ((ret = db->del (txn, &key, 0)) != 0)
+ {
+ derr(1) << "deleting " << aid << ": " << db_strerror(ret) << endl;
+ if (txn != NULL)
+ txn->abort();
+ return -EIO;
+ }
+ if (txn != NULL)
+ txn->commit (0);
+
+ dout(4) << "..collection rmattr OK" << endl;
return 0;
}
if (!mounted)
return -EINVAL;
- dout(2) << "collection_getattr " << cid << " " << name << endl;
+ dout(2) << "collection_getattr " << hex << cid << dec
+ << " " << name << endl;
+
+ // XXX transactions/read isolation?
coll_attr_id caid = new_coll_attr_id (cid, name);
Dbt key (&caid, sizeof (caid));
val.set_flags (DB_DBT_USERMEM | DB_DBT_PARTIAL);
if (db->get (NULL, &key, &val, 0) != 0)
- return -ENOENT;
+ {
+ derr(1) << ".no attribute entry" << endl;
+ return -ENOENT;
+ }
+ dout(4) << "..collection getattr OK; returns " << val.get_size() << endl;
return val.get_size();
}
if (!mounted)
return -EINVAL;
- dout(2) << "collection_listattr " << cid << endl;
+ dout(2) << "collection_listattr " << hex << cid << dec << endl;
+
+ // XXX transactions/read isolation?
coll_attrs_id caids = new_coll_attrs_id (cid);
Dbt key (&caids, sizeof_coll_attrs_id());
#include <db_cxx.h>
#include "osd/ObjectStore.h"
-// Redefine this to use a different BDB access type. DB_BTREE is
-// probably the only other one that makes sense.
-#ifndef OSBDB_DB_TYPE
-#define OSBDB_DB_TYPE DB_HASH
-#endif // OSBDB_DB_TYPE
+#define OSBDB_MAGIC 0x05BDB
/*
* Maximum length of an attribute name.
return out;
}
+class OSBDBException : public std::exception
+{
+ const char *msg;
+
+public:
+ OSBDBException(const char *msg) : msg(msg) { }
+ const char *what() { return msg; }
+};
+
/*
* The object store interface for Berkeley DB.
*/
DbEnv *env;
Db *db;
string device;
+ string env_dir;
bool mounted;
bool opened;
+ bool transactional;
public:
- OSBDB(const char *dev)
- : env(0), db (0), device (dev), mounted(false), opened(false)
+ OSBDB(const char *dev) throw(OSBDBException)
+ : env(0), db (0), device (dev), mounted(false), opened(false),
+ transactional(g_conf.bdbstore_transactional)
{
- /*env = new DbEnv (DB_CXX_NO_EXCEPTIONS);
- env->set_error_stream (&std::cerr);
- // WTF? You can't open an env if you set this flag here, but BDB
- // says you also can't set it after you open the env.
- //env->set_flags (DB_LOG_INMEMORY, 1);
- char *p = strrchr (dev, '/');
- int env_flags = (DB_CREATE | DB_THREAD | DB_INIT_LOCK
- | DB_INIT_MPOOL | DB_INIT_TXN | DB_INIT_LOG);
- if (p != NULL)
- {
- *p = '\0';
- if (env->open (dev, env_flags, 0) != 0)
- {
- std::cerr << "failed to open environment: "
- << dev << std::endl;
- ::abort();
- }
- *p = '/';
- dev = p+1;
- }
- else
- {
- if (env->open (NULL, env_flags, 0) != 0)
- {
- std::cerr << "failed to open environment: ." << std::endl;
- ::abort();
- }
- }
-
- // Double WTF: if you remove the DB_LOG_INMEMORY bit, db->open
- // fails, inexplicably, with EINVAL!*/
- // env->set_flags (DB_DIRECT_DB | /*DB_AUTO_COMMIT |*/ DB_LOG_INMEMORY, 1);
}
~OSBDB()
{
umount();
}
- if (env != NULL)
- {
- env->close (0);
- delete env;
- }
}
int mount();
void sync();
private:
- int opendb (DBTYPE type=DB_UNKNOWN, int flags=0);
+ int opendb (DBTYPE type=DB_UNKNOWN, int flags=0, bool new_env=false);
int _setattr(object_t oid, const char *name, const void *value,
- size_t size, Context *onsync);
+ size_t size, Context *onsync, DbTxn *txn);
int _getattr(object_t oid, const char *name, void *value, size_t size);
};
{
static hash<object_t> H;
assert(sizeof(oid) == 16);
+#ifdef __LP64__
+ sprintf(s, "%s/objects/%02lx/%016lx.%016lx", basedir.c_str(), H(oid) & HASH_MASK,
+ *((__uint64_t*)&oid),
+ *(((__uint64_t*)&oid) + 1));
+#else
sprintf(s, "%s/objects/%02x/%016llx.%016llx", basedir.c_str(), H(oid) & HASH_MASK,
*((__uint64_t*)&oid),
*(((__uint64_t*)&oid) + 1));
+#endif
}
void FakeStore::get_cdir(coll_t cid, char *s)
{
assert(sizeof(cid) == 8);
+#ifdef __LP64__
+ sprintf(s, "%s/collections/%016lx", basedir.c_str(),
+ cid);
+#else
sprintf(s, "%s/collections/%016llx", basedir.c_str(),
cid);
+#endif
}
void FakeStore::get_coname(coll_t cid, object_t oid, char *s)
{
assert(sizeof(oid) == 16);
+#ifdef __LP64__
+ sprintf(s, "%s/collections/%016lx/%016lx.%016lx", basedir.c_str(), cid,
+ *((__uint64_t*)&oid),
+ *(((__uint64_t*)&oid) + 1));
+#else
sprintf(s, "%s/collections/%016llx/%016llx.%016llx", basedir.c_str(), cid,
*((__uint64_t*)&oid),
*(((__uint64_t*)&oid) + 1));
+#endif
}
gethostname(hostname,100);
sprintf(dev_path, "%s/osd%d", ebofs_base_path, whoami);
-
+
struct stat sta;
if (::lstat(dev_path, &sta) != 0)
sprintf(dev_path, "%s/osd.%s", ebofs_base_path, hostname);
if (dest.is_osd()) {
// failed osd. drop message, report to mon.
int mon = monmap->pick_mon();
- dout(0) << "ms_handle_failure " << dest << " inst " << inst
+ dout(0) << "ms_handle_failure " << inst
<< ", dropping and reporting to mon" << mon
+ << " " << *m
<< endl;
messenger->send_message(new MOSDFailure(inst, osdmap->get_epoch()),
monmap->get_inst(mon));
} else if (dest.is_mon()) {
// resend to a different monitor.
int mon = monmap->pick_mon(true);
- dout(0) << "ms_handle_failure " << dest << " inst " << inst
+ dout(0) << "ms_handle_failure " << inst
<< ", resending to mon" << mon
+ << " " << *m
<< endl;
messenger->send_message(m, monmap->get_inst(mon));
}
else {
// client?
- dout(0) << "ms_handle_failure " << dest << " inst " << inst
- << ", dropping" << endl;
+ dout(0) << "ms_handle_failure " << inst
+ << ", dropping " << *m << endl;
delete m;
}
}
struct stat st;
int r = store->stat(oid, &st);
if (r >= 0) {
- if (op->get_offset() + op->get_length() >= st.st_size) {
+ if (op->get_offset() + (off_t)op->get_length() >= (off_t)st.st_size) {
if (op->get_offset())
t.truncate(oid, op->get_length() + op->get_offset());
else
type = PG_TYPE_STARTOSD;
}
break;
+
+ default:
+ assert(0);
}
// construct final PG
#include "config.h"
#undef dout
-#define dout(x) if (x <= g_conf.debug || x <= g_conf.debug_filer) cout << g_clock.now() << " " << objecter->messenger->get_myaddr() << ".filer "
+#define dout(x) if (x <= g_conf.debug || x <= g_conf.debug_filer) cout << g_clock.now() << " " << objecter->messenger->get_myname() << ".filer "
class Filer::C_Probe : public Context {
#include "config.h"
#undef dout
-#define dout(x) if (x <= g_conf.debug || x <= g_conf.debug_objecter) cout << g_clock.now() << " " << objecter->messenger->get_myaddr() << ".journaler "
-#define derr(x) if (x <= g_conf.debug || x <= g_conf.debug_objecter) cerr << g_clock.now() << " " << objecter->messenger->get_myaddr() << ".journaler "
+#define dout(x) if (x <= g_conf.debug || x <= g_conf.debug_objecter) cout << g_clock.now() << " " << objecter->messenger->get_myname() << ".journaler "
+#define derr(x) if (x <= g_conf.debug || x <= g_conf.debug_objecter) cerr << g_clock.now() << " " << objecter->messenger->get_myname() << ".journaler "
if (!g_conf.journaler_allow_split_entries) {
// will we span a stripe boundary?
int p = inode.layout.stripe_size;
- if (write_pos / p != (write_pos + bl.length() + sizeof(s)) / p) {
+ if (write_pos / p != (write_pos + (off_t)(bl.length() + sizeof(s))) / p) {
// yes.
// move write_pos forward.
off_t owp = write_pos;
// start reading some more?
if (!_is_reading()) {
if (s)
- fetch_len = MAX(fetch_len, sizeof(s)+s-read_buf.length());
+ fetch_len = MAX(fetch_len, (off_t)(sizeof(s)+s-read_buf.length()));
_issue_read(fetch_len);
}
/*** ObjectCacher::Object ***/
#undef dout
-#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_objectcacher) cout << oc->objecter->messenger->get_myaddr() << ".objectcacher.object(" << oid << ") "
+#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_objectcacher) cout << g_clock.now() << " " << oc->objecter->messenger->get_myname() << ".objectcacher.object(" << oid << ") "
ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *bh, off_t off)
dout(10) << "merge_left result " << *left << endl;
}
-/* buggy possibly, but more importnatly, unnecessary.
-void ObjectCacher::Object::merge_right(BufferHead *left, BufferHead *right)
-{
- assert(left->end() == right->start());
- assert(left->get_state() == right->get_state());
-
- dout(10) << "merge_right " << *left << " + " << *right << endl;
- oc->bh_remove(this, left);
- oc->bh_stat_sub(right);
- data.erase(right->start());
- right->set_start( left->start() );
- data[right->start()] = right;
- right->set_length( left->length() + right->length());
- oc->bh_stat_add(right);
-
- // data
- bufferlist nbl;
- nbl.claim(left->bl);
- nbl.claim_append(right->bl);
- right->bl.claim(nbl);
-
- // version
- // note: this is sorta busted, but should only be used for dirty buffers
- right->last_write_tid = MAX( left->last_write_tid, right->last_write_tid );
-
- // waiters
- map<off_t,list<Context*> > old;
- old.swap(right->waitfor_read);
- // take left's waiters
- right->waitfor_read.swap(left->waitfor_read);
-
- // shift old waiters
- for (map<off_t, list<Context*> >::iterator p = old.begin();
- p != old.end();
- p++)
- right->waitfor_read[p->first + left->length()].swap( p->second );
-
- // hose left
- delete left;
-
- dout(10) << "merge_right result " << *right << endl;
-}
-*/
/*
* map a range of bytes into buffer_heads.
/*** ObjectCacher ***/
#undef dout
-#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_objectcacher) cout << objecter->messenger->get_myaddr() << ".objectcacher "
+#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_objectcacher) cout << g_clock.now() << " " << objecter->messenger->get_myname() << ".objectcacher "
+
/* private */
+void ObjectCacher::close_object(Object *ob)
+{
+ dout(10) << "close_object " << *ob << endl;
+ assert(ob->can_close());
+
+ // ok!
+ objects.erase(ob->get_oid());
+ objects_by_ino[ob->get_ino()].erase(ob);
+ if (objects_by_ino[ob->get_ino()].empty())
+ objects_by_ino.erase(ob->get_ino());
+ delete ob;
+}
+
+
+
+
void ObjectCacher::bh_read(BufferHead *bh, ExtCap *read_ext_cap)
{
dout(7) << "bh_read on " << *bh << endl;
}
dout(10) << "readx result is " << rd->bl->length() << endl;
+ // done with read.
+ delete rd;
+
trim();
return pos;
}
+// purge. non-blocking. violently removes dirty buffers from cache.
+void ObjectCacher::purge(Object *ob)
+{
+ dout(10) << "purge " << *ob << endl;
+
+ for (map<off_t,BufferHead*>::iterator p = ob->data.begin();
+ p != ob->data.end();
+ p++) {
+ BufferHead *bh = p->second;
+ dout(0) << "purge forcibly removing " << *bh << endl;
+ bh_remove(ob, bh);
+ delete bh;
+ }
+
+ if (ob->can_close()) {
+ dout(10) << "trim trimming " << *ob << endl;
+ close_object(ob);
+ }
+}
+
// flush. non-blocking. no callback.
// true if clean, already flushed.
// false if we wrote something.
return false;
}
+void ObjectCacher::purge_set(inodeno_t ino)
+{
+ if (objects_by_ino.count(ino) == 0) {
+ dout(10) << "purge_set on " << ino << " dne" << endl;
+ return;
+ }
+
+ dout(10) << "purge_set " << ino << endl;
+
+ set<Object*>& s = objects_by_ino[ino];
+ for (set<Object*>::iterator i = s.begin();
+ i != s.end();
+ i++) {
+ Object *ob = *i;
+ purge(ob);
+ }
+}
+
off_t ObjectCacher::release(Object *ob)
{
for (list<BufferHead*>::iterator p = clean.begin();
p != clean.end();
- p++)
+ p++) {
bh_remove(ob, *p);
+ delete *p;
+ }
+
+ if (ob->can_close()) {
+ dout(10) << "trim trimming " << *ob << endl;
+ close_object(ob);
+ }
return o_unclean;
}
dout(10) << "release_set " << ino << endl;
- set<Object*>& s = objects_by_ino[ino];
+ set<Object*> s = objects_by_ino[ino];
for (set<Object*>::iterator i = s.begin();
i != s.end();
i++) {
last_write_tid(0), last_ack_tid(0), last_commit_tid(0),
lock_state(LOCK_NONE), wrlock_ref(0), rdlock_ref(0)
{}
+ ~Object() {
+ assert(data.empty());
+ }
object_t get_oid() { return oid; }
inodeno_t get_ino() { return ino; }
objects_by_ino[ino].insert(o);
return o;
}
- void close_object(Object *ob) {
- assert(ob->can_close());
-
- // ok!
- objects.erase(ob->get_oid());
- objects_by_ino[ob->get_ino()].erase(ob);
- if (objects_by_ino[ob->get_ino()].empty())
- objects_by_ino.erase(ob->get_ino());
- delete ob;
- }
+ void close_object(Object *ob);
// bh stats
Cond stat_cond;
void bh_add(Object *ob, BufferHead *bh) {
ob->add_bh(bh);
- if (bh->is_dirty())
+ if (bh->is_dirty()) {
lru_dirty.lru_insert_top(bh);
- else
+ dirty_bh.insert(bh);
+ } else {
lru_rest.lru_insert_top(bh);
+ }
bh_stat_add(bh);
}
void bh_remove(Object *ob, BufferHead *bh) {
ob->remove_bh(bh);
- if (bh->is_dirty())
+ if (bh->is_dirty()) {
lru_dirty.lru_remove(bh);
- else
+ dirty_bh.erase(bh);
+ } else {
lru_rest.lru_remove(bh);
+ }
bh_stat_sub(bh);
}
bool flush(Object *o);
off_t release(Object *o);
+ void purge(Object *o);
void rdlock(Object *o);
void rdunlock(Object *o);
flusher_thread.create();
}
~ObjectCacher() {
- //lock.Lock(); // hmm.. watch out for deadlock!
+ // we should be empty.
+ assert(objects.empty());
+ assert(lru_rest.lru_get_size() == 0);
+ assert(lru_dirty.lru_get_size() == 0);
+ assert(dirty_bh.empty());
+
+ assert(flusher_thread.is_started());
+ lock.Lock(); // hmm.. watch out for deadlock!
flusher_stop = true;
flusher_cond.Signal();
- //lock.Unlock();
+ lock.Unlock();
flusher_thread.join();
}
bool commit_set(inodeno_t ino, Context *oncommit);
void commit_all(Context *oncommit=0);
+ void purge_set(inodeno_t ino);
+
off_t release_set(inodeno_t ino); // returns # of bytes not released (ie non-clean)
void kick_sync_writers(inodeno_t ino);
#include "config.h"
#undef dout
-#define dout(x) if (x <= g_conf.debug || x <= g_conf.debug_objecter) cout << g_clock.now() << " " << messenger->get_myaddr() << ".objecter "
-#define derr(x) if (x <= g_conf.debug || x <= g_conf.debug_objecter) cerr << g_clock.now() << " " << messenger->get_myaddr() << ".objecter "
+#define dout(x) if (x <= g_conf.debug || x <= g_conf.debug_objecter) cout << g_clock.now() << " " << messenger->get_myname() << ".objecter "
+#define derr(x) if (x <= g_conf.debug || x <= g_conf.debug_objecter) cerr << g_clock.now() << " " << messenger->get_myname() << ".objecter "
// messages ------------------------------
ObjectExtent &ex = st->extents.front();
PG &pg = get_pg( ex.pgid );
- // send
+ // pick tid
last_tid++;
assert(client_inc >= 0);
- MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid,
- ex.oid, ex.pgid, osdmap->get_epoch(),
- OSD_OP_STAT);
+
+ // add to gather set
+ st->tid = last_tid;
+ op_stat[last_tid] = st;
+
+ pg.active_tids.insert(last_tid);
+
+ // send?
dout(10) << "stat_submit " << st << " tid " << last_tid
<< " oid " << ex.oid
<< " pg " << ex.pgid
<< " osd" << pg.acker()
<< endl;
- if (pg.acker() >= 0)
+ if (pg.acker() >= 0) {
+ MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid,
+ ex.oid, ex.pgid, osdmap->get_epoch(),
+ OSD_OP_STAT);
+
messenger->send_message(m, osdmap->get_inst(pg.acker()));
+ }
- // add to gather set
- st->tid = last_tid;
- op_stat[last_tid] = st;
-
- pg.active_tids.insert(last_tid);
-
return last_tid;
}
// find OSD
PG &pg = get_pg( ex.pgid );
- // send
+ // pick tid
last_tid++;
assert(client_inc >= 0);
- MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid,
- ex.oid, ex.pgid, osdmap->get_epoch(),
- OSD_OP_READ);
- m->set_length(ex.length);
- m->set_offset(ex.start);
- // set ext cap
- // FIXME mds currently is writing without caps...so we let it
- // all other (client) writes should have cap
- if (m->get_client().is_client()) {
- m->set_user(rd->ext_cap->get_uid());
- m->set_capability(rd->ext_cap);
- }
+
+ // add to gather set
+ rd->ops[last_tid] = ex;
+ op_read[last_tid] = rd;
+
+ pg.active_tids.insert(last_tid);
+
+ // send?
dout(10) << "readx_submit " << rd << " tid " << last_tid
<< " oid " << ex.oid << " " << ex.start << "~" << ex.length
<< " (" << ex.buffer_extents.size() << " buffer fragments)"
<< " osd" << pg.acker()
<< endl;
- if (pg.acker() >= 0)
+ if (pg.acker() >= 0) {
+ MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid,
+ ex.oid, ex.pgid, osdmap->get_epoch(),
+ OSD_OP_READ);
+ m->set_length(ex.length);
+ m->set_offset(ex.start);
+
+ // set ext cap
+ // FIXME mds currently is writing without caps...so we let it
+ // all other (client) writes should have cap
+ if (m->get_client().is_client()) {
+ m->set_user(rd->ext_cap->get_uid());
+ m->set_capability(rd->ext_cap);
+ }
+
messenger->send_message(m, osdmap->get_inst(pg.acker()));
+ }
- // add to gather set
- rd->ops[last_tid] = ex;
- op_read[last_tid] = rd;
-
- pg.active_tids.insert(last_tid);
-
return last_tid;
}
// find
PG &pg = get_pg( ex.pgid );
- // send
+ // pick tid
tid_t tid;
if (usetid > 0)
tid = usetid;
else
tid = ++last_tid;
- MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, tid,
- ex.oid, ex.pgid, osdmap->get_epoch(),
- wr->op);
- m->set_length(ex.length);
- m->set_offset(ex.start);
- m->set_rev(ex.rev);
- // only cap for a write, fix later
- // FIXME mds does writes through this interface without a cap
- // we let it for now
- if (wr->op == OSD_OP_WRITE && m->get_client().is_client()) {
- m->set_user(wr->modify_cap->get_uid());
- m->set_capability(wr->modify_cap);
- }
-
- if (wr->tid_version.count(tid))
- m->set_version(wr->tid_version[tid]); // we're replaying this op!
-
- // what type of op?
- switch (wr->op) {
- case OSD_OP_WRITE:
- {
- // map buffer segments into this extent
- // (may be fragmented bc of striping)
- bufferlist cur;
- for (map<size_t,size_t>::iterator bit = ex.buffer_extents.begin();
- bit != ex.buffer_extents.end();
- bit++) {
- bufferlist thisbit;
- thisbit.substr_of(((OSDWrite*)wr)->bl, bit->first, bit->second);
- cur.claim_append(thisbit);
- }
- assert(cur.length() == ex.length);
- m->set_data(cur);//.claim(cur);
- }
- break;
- }
-
// add to gather set
wr->waitfor_ack[tid] = ex;
wr->waitfor_commit[tid] = ex;
++num_unacked;
++num_uncommitted;
- // send
+ // send?
dout(10) << "modifyx_submit " << MOSDOp::get_opname(wr->op) << " tid " << tid
<< " oid " << ex.oid
<< " " << ex.start << "~" << ex.length
<< " pg " << ex.pgid
<< " osd" << pg.primary()
<< endl;
- if (pg.primary() >= 0)
+ if (pg.primary() >= 0) {
+ MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, tid,
+ ex.oid, ex.pgid, osdmap->get_epoch(),
+ wr->op);
+ m->set_length(ex.length);
+ m->set_offset(ex.start);
+ m->set_rev(ex.rev);
+
+ // only cap for a write, fix later
+ // FIXME mds does writes through this interface without a cap
+ // we let it for now
+ if (wr->op == OSD_OP_WRITE && m->get_client().is_client()) {
+ m->set_user(wr->modify_cap->get_uid());
+ m->set_capability(wr->modify_cap);
+ }
+
+ if (wr->tid_version.count(tid))
+ m->set_version(wr->tid_version[tid]); // we're replaying this op!
+
+ // what type of op?
+ switch (wr->op) {
+ case OSD_OP_WRITE:
+ {
+ // map buffer segments into this extent
+ // (may be fragmented bc of striping)
+ bufferlist cur;
+ for (map<size_t,size_t>::iterator bit = ex.buffer_extents.begin();
+ bit != ex.buffer_extents.end();
+ bit++) {
+ bufferlist thisbit;
+ thisbit.substr_of(((OSDWrite*)wr)->bl, bit->first, bit->second);
+ cur.claim_append(thisbit);
+ }
+ assert(cur.length() == ex.length);
+ m->set_data(cur);//.claim(cur);
+ }
+ break;
+ }
+
messenger->send_message(m, osdmap->get_inst(pg.primary()));
+ }
dout(5) << num_unacked << " unacked, " << num_uncommitted << " uncommitted" << endl;
#$e = './tcpsynobfs' if $h->{'fs'} eq 'obfs';
my $c = "$e";
$c .= " --mkfs" unless $h->{'no_mkfs'};
- $c .= " --$h->{'fs'}";
+ $c .= " --$h->{'fs'}" if $h->{'fs'};
$c .= " --syn until $h->{'until'}" if $h->{'until'};
$c .= " --syn writefile $h->{'writefile_mb'} $h->{'writefile_size'}" if $h->{'writefile'};
#include <iostream>
#include <cerrno>
+#include <vector>
#include <fcntl.h>
#include <sys/mount.h>
int main (int argc, char **argv)
{
+ vector<char *> args;
char *osd_name = "ebofs";
unsigned object_size = 1024;
unsigned object_count = 1024;
char *mountcmd = "mount /tmp/testos";
char *umountcmd = "umount /tmp/testos";
+ bool ebofs_raw_device = false;
bool inhibit_remount = (getenv("TESTOS_INHIBIT_REMOUNT") != NULL);
if (argc > 1
&& (strcmp (argv[1], "-h") == 0
|| strcmp (argv[1], "-help") == 0
- || strcmp (argv[1], "--help") == 0
- || argc > 6))
+ || strcmp (argv[1], "--help") == 0))
{
cout << "usage: " << argv[0] << " [store [object-size [object-count [iterations [seed]]]]]" << endl;
cout << endl;
exit (0);
}
+ argv_to_vec (argc, argv, args);
+ for (vector<char*>::iterator it = args.begin(); it != args.end();
+ it++)
+ cout << *it << " ";
+ cout << endl;
+ parse_config_options (args);
+ for (vector<char*>::iterator it = args.begin(); it != args.end();
+ it++)
+ cout << *it << " ";
+ cout << endl;
+
+ argc = args.size();
+ if (argc > 0)
+ osd_name = args[0];
if (argc > 1)
- osd_name = argv[1];
+ object_size = (unsigned) atol (args[1]);
if (argc > 2)
- object_size = (unsigned) atol (argv[2]);
+ object_count = (unsigned) atol (args[2]);
if (argc > 3)
- object_count = (unsigned) atol (argv[3]);
+ write_iter = (unsigned) atol (args[3]);
if (argc > 4)
- write_iter = (unsigned) atol (argv[4]);
- if (argc > 5)
- random_seed = (unsigned) atol (argv[5]);
+ random_seed = (unsigned) atol (args[4]);
// algin object size to 'long'
object_size = ((object_size + (sizeof (long) - 1)) / sizeof (long)) * sizeof (long);
strcpy (osd_file, "/tmp/testos/testos.XXXXXX");
mktemp (osd_file);
+ if (strcasecmp (osd_name, "ebofs") == 0)
+ {
+ char *dev_env = getenv ("TESTOS_EBOFS_DEV");
+ if (dev_env != NULL)
+ {
+ // Assume it is a true device.
+ strncpy (osd_file, dev_env, 32);
+ inhibit_remount = true;
+ ebofs_raw_device = true;
+ }
+ }
+
if (!inhibit_remount)
{
if (system (mountcmd) != 0)
ObjectStore *os = NULL;
if (strcasecmp (osd_name, "ebofs") == 0)
{
- FILE *f = fopen (osd_file, "w");
- if (f == NULL)
+ if (!ebofs_raw_device)
{
- cerr << "failed to open " << osd_file << ": " << strerror (errno)
- << endl;
- exit (1);
+ FILE *f = fopen (osd_file, "w");
+ if (f == NULL)
+ {
+ cerr << "failed to open " << osd_file << ": " << strerror (errno)
+ << endl;
+ exit (1);
+ }
+ // 1G file.
+ fseek (f, 1024 * 1024 * 1024, SEEK_SET);
+ fputc ('\0', f);
+ fclose (f);
}
- // 1G file.
- fseek (f, 1024 * 1024 * 1024, SEEK_SET);
- fputc ('\0', f);
- fclose (f);
- // 20K cache
- g_conf.ebofs_bc_size = 5; // times 4K
os = new Ebofs (osd_file);
}
else if (strcasecmp (osd_name, "osbdb") == 0)
{
- char *e = getenv ("OSBDB_FFACTOR");
- if (e != NULL)
- g_conf.bdbstore_ffactor = atol(e);
- e = getenv ("OSBDB_NELEM");
- if (e != NULL)
- g_conf.bdbstore_nelem = atol(e);
- e = getenv ("OSBDB_PAGESIZE");
- if (e != NULL)
- g_conf.bdbstore_pagesize = atol(e);
- g_conf.debug_bdbstore = 1;
- // 20K cache
- g_conf.bdbstore_cachesize = 20 * 1024;
os = new OSBDB (osd_file);
}
else if (strcasecmp (osd_name, "osbdb-btree") == 0)
{
g_conf.bdbstore_btree = true;
- // 20K cache
- g_conf.bdbstore_cachesize = 20 * 1024;
os = new OSBDB (osd_file);
}
else
cerr << "write " << oids[o] << " failed: "
<< strerror (-ret) << endl;
}
+ os->sync();
+
utime_t end = g_clock.now() - begin;
cerr << "Write finished in " << end << endl;
total_write += end;
writes[i] = end;
- os->sync();
os->umount();
sync();
os->mount();
+ // Shuffle the OIDs.
+ for (int j = 0; j < object_count; j++)
+ {
+ int x = random() % object_count;
+ if (x < 0)
+ x = -x;
+ object_t o = oids[j];
+ oids[j] = oids[x];
+ oids[x] = o;
+ }
+
begin = g_clock.now();
for (unsigned o = 0; o < object_count; o++)
{
cerr << "Finished in " << (total_write + total_read) << endl;
- double write_mean = (double) total_write / write_iter;
+ double write_mean = ((double) total_write) / ((double) write_iter);
double write_sd = 0.0;
for (unsigned i = 0; i < write_iter; i++)
{
- double x = (double) writes[i] - write_mean;
+ double x = ((double) writes[i]) - write_mean;
write_sd += x * x;
}
- write_sd = sqrt (write_sd / write_iter);
+ write_sd = sqrt (write_sd / ((double) write_iter));
- double read_mean = (double) total_read / write_iter;
+ double read_mean = ((double) total_read) / ((double) write_iter);
double read_sd = 0.0;
for (unsigned i = 0; i < write_iter; i++)
{
- double x = (double) reads[i] - read_mean;
+ double x = ((double) reads[i]) - read_mean;
write_sd += x * x;
}
- read_sd = sqrt (read_sd / write_iter);
+ read_sd = sqrt (read_sd / ((double) write_iter));
cout << "TESTOS: write " << osd_name << ":" << object_size << ":"
<< object_count << ":" << write_iter << ":" << random_seed
<< " -- " << write_mean << " " << write_sd << endl;
+ cout << "TESTOS: write.raw -- ";
+ for (int i = 0; i < write_iter; i++)
+ cout << ((double) writes[i]) << " ";
+ cout << endl;
+
cout << "TESTOS: read " << osd_name << ":" << object_size << ":"
<< object_count << ":" << write_iter << ":" << random_seed
<< " -- " << read_mean << " " << read_sd << endl;
+ cout << "TESTOS: read.raw -- ";
+ for (int i = 0; i < write_iter; i++)
+ cout << ((double) reads[i]) << " ";
+ cout << endl;
+
unlink (osd_file);
if (!inhibit_remount)
{
--- /dev/null
+/* testosbdb.cc -- test OSBDB.
+ Copyright (C) 2007 Casey Marshall <csm@soe.ucsc.edu> */
+
+
+#include <iostream>
+#include "osbdb/OSBDB.h"
+
+using namespace std;
+
+int
+main (int argc, char **argv)
+{
+ vector<char *> args;
+ argv_to_vec (argc, argv, args);
+ parse_config_options (args);
+
+ g_conf.debug_bdbstore = 10;
+ //g_conf.bdbstore_btree = true;
+ char dbfile[256];
+ strncpy (dbfile, "/tmp/testosbdb/db.XXXXXX", 256);
+ mktemp (dbfile);
+ OSBDB *os = new OSBDB(dbfile);
+ auto_ptr<OSBDB> osPtr (os);
+ os->mkfs();
+ os->mount();
+
+ // Put an object.
+ object_t oid (0xDEADBEEF00000000ULL, 0xFEEDFACE);
+
+ cout << "sizeof oid_t is " << sizeof (oid_t) << endl;
+ cout << "offsetof oid_t.id " << offsetof (oid_t, id) << endl;
+
+ cout << sizeof (object_t) << endl;
+ cout << sizeof (oid.ino) << endl;
+ cout << sizeof (oid.bno) << endl;
+ cout << sizeof (oid.rev) << endl;
+
+ // Shouldn't be there.
+ if (os->exists (oid))
+ {
+ cout << "FAIL: oid shouldn't be there " << oid << endl;
+ }
+
+ // Write an object.
+ char *x = (char *) malloc (1024);
+ memset(x, 0xaa, 1024);
+ bufferptr bp (x, 1024);
+ bufferlist bl;
+ bl.push_back (bp);
+
+ if (os->write (oid, 0L, 1024, bl, NULL) != 1024)
+ {
+ cout << "FAIL: writing object" << endl;
+ }
+
+ os->sync();
+
+ // Should be there.
+ if (!os->exists (oid))
+ {
+ cout << "FAIL: oid should be there: " << oid << endl;
+ }
+
+ memset(x, 0, 1024);
+ if (os->read (oid, 0, 1024, bl) != 1024)
+ {
+ cout << "FAIL: reading object" << endl;
+ }
+
+ for (int i = 0; i < 1024; i++)
+ {
+ if ((x[i] & 0xFF) != 0xaa)
+ {
+ cout << "FAIL: data read out is different" << endl;
+ break;
+ }
+ }
+
+ // Set some attributes
+ if (os->setattr (oid, "alpha", "value", strlen ("value")) != 0)
+ {
+ cout << "FAIL: set attribute" << endl;
+ }
+ if (os->setattr (oid, "beta", "value", strlen ("value")) != 0)
+ {
+ cout << "FAIL: set attribute" << endl;
+ }
+ if (os->setattr (oid, "gamma", "value", strlen ("value")) != 0)
+ {
+ cout << "FAIL: set attribute" << endl;
+ }
+ if (os->setattr (oid, "fred", "value", strlen ("value")) != 0)
+ {
+ cout << "FAIL: set attribute" << endl;
+ }
+
+ char *attrs = (char *) malloc (1024);
+ if (os->listattr (oid, attrs, 1024) != 0)
+ {
+ cout << "FAIL: listing attributes" << endl;
+ }
+ else
+ {
+ char *p = attrs;
+ if (strcmp (p, "alpha") != 0)
+ {
+ cout << "FAIL: should be \"alpha:\" \"" << p << "\"" << endl;
+ }
+ p = p + strlen (p) + 1;
+ if (strcmp (p, "beta") != 0)
+ {
+ cout << "FAIL: should be \"beta:\" \"" << p << "\"" << endl;
+ }
+ p = p + strlen (p) + 1;
+ if (strcmp (p, "fred") != 0)
+ {
+ cout << "FAIL: should be \"fred:\" \"" << p << "\"" << endl;
+ }
+ p = p + strlen (p) + 1;
+ if (strcmp (p, "gamma") != 0)
+ {
+ cout << "FAIL: should be \"gamma:\" \"" << p << "\"" << endl;
+ }
+ }
+
+ char attrvalue[256];
+ memset(attrvalue, 0, sizeof (attrvalue));
+ if (os->getattr (oid, "alpha", attrvalue, sizeof(attrvalue)) < 0)
+ {
+ cout << "FAIL: getattr alpha" << endl;
+ }
+ else if (strncmp ("value", attrvalue, strlen("value")) != 0)
+ {
+ cout << "FAIL: read attribute value differs" << endl;
+ }
+ memset(attrvalue, 0, sizeof (attrvalue));
+ if (os->getattr (oid, "fred", attrvalue, sizeof(attrvalue)) < 0)
+ {
+ cout << "FAIL: getattr fred" << endl;
+ }
+ else if (strncmp ("value", attrvalue, strlen("value")) != 0)
+ {
+ cout << "FAIL: read attribute value differs" << endl;
+ }
+ memset(attrvalue, 0, sizeof (attrvalue));
+ if (os->getattr (oid, "beta", attrvalue, sizeof(attrvalue)) < 0)
+ {
+ cout << "FAIL: getattr beta" << endl;
+ }
+ else if (strncmp ("value", attrvalue, strlen("value")) != 0)
+ {
+ cout << "FAIL: read attribute value differs" << endl;
+ }
+ memset(attrvalue, 0, sizeof (attrvalue));
+ if (os->getattr (oid, "gamma", attrvalue, sizeof(attrvalue)) < 0)
+ {
+ cout << "FAIL: getattr gamma" << endl;
+ }
+ else if (strncmp ("value", attrvalue, strlen("value")) != 0)
+ {
+ cout << "FAIL: read attribute value differs" << endl;
+ }
+
+ if (os->setattr (oid, "alpha", "different", strlen("different")) != 0)
+ cout << "FAIL: setattr overwrite" << endl;
+ memset(attrvalue, 0, sizeof (attrvalue));
+ if (os->getattr (oid, "alpha", attrvalue, sizeof(attrvalue)) < 0)
+ {
+ cout << "FAIL: getattr alpha" << endl;
+ }
+ else if (strncmp ("different", attrvalue, strlen("different")) != 0)
+ {
+ cout << "FAIL: read attribute value differs" << endl;
+ }
+
+ if (os->rmattr (oid, "alpha") != 0)
+ {
+ cout << "FAIL: rmattr alpha" << endl;
+ }
+ if (os->rmattr (oid, "fred") != 0)
+ {
+ cout << "FAIL: rmattr fred" << endl;
+ }
+ if (os->rmattr (oid, "beta") != 0)
+ {
+ cout << "FAIL: rmattr beta" << endl;
+ }
+ if (os->rmattr (oid, "gamma") != 0)
+ {
+ cout << "FAIL: rmattr gamma" << endl;
+ }
+
+ coll_t cid = 0xCAFEBABE;
+ if (os->create_collection (cid) != 0)
+ {
+ cout << "FAIL: create_collection" << endl;
+ }
+ if (os->create_collection (cid + 10) != 0)
+ {
+ cout << "FAIL: create_collection" << endl;
+ }
+ if (os->create_collection (cid + 5) != 0)
+ {
+ cout << "FAIL: create_collection" << endl;
+ }
+ if (os->create_collection (42) != 0)
+ {
+ cout << "FAIL: create_collection" << endl;
+ }
+
+ if (os->collection_add (cid, oid) != 0)
+ {
+ cout << "FAIL: collection_add" << endl;
+ }
+
+ list<coll_t> ls;
+ if (os->list_collections (ls) < 0)
+ {
+ cout << "FAIL: list_collections" << endl;
+ }
+ cout << "collections: ";
+ for (list<coll_t>::iterator it = ls.begin(); it != ls.end(); it++)
+ {
+ cout << *it << ", ";
+ }
+ cout << endl;
+
+ if (os->destroy_collection (0xCAFEBABE + 10) != 0)
+ {
+ cout << "FAIL: destroy_collection" << endl;
+ }
+
+ if (os->destroy_collection (0xCAFEBADE + 10) == 0)
+ {
+ cout << "FAIL: destroy_collection" << endl;
+ }
+
+ object_t oid2 (12345, 12345);
+ for (int i = 0; i < 8; i++)
+ {
+ oid2.rev++;
+ if (os->collection_add (cid, oid2) != 0)
+ {
+ cout << "FAIL: collection_add" << endl;
+ }
+ }
+ for (int i = 0; i < 8; i++)
+ {
+ if (os->collection_remove (cid, oid2) != 0)
+ {
+ cout << "FAIL: collection_remove" << endl;
+ }
+ oid2.rev--;
+ }
+
+ if (os->collection_setattr (cid, "alpha", "value", 5) != 0)
+ cout << "FAIL: collection_setattr" << endl;
+ if (os->collection_setattr (cid, "beta", "value", 5) != 0)
+ cout << "FAIL: collection_setattr" << endl;
+ if (os->collection_setattr (cid, "gamma", "value", 5) != 0)
+ cout << "FAIL: collection_setattr" << endl;
+ if (os->collection_setattr (cid, "fred", "value", 5) != 0)
+ cout << "FAIL: collection_setattr" << endl;
+
+ memset (attrvalue, 0, sizeof (attrvalue));
+ if (os->collection_getattr (cid, "alpha", attrvalue, sizeof (attrvalue)) < 0)
+ cout << "FAIL: collection_getattr" << endl;
+ else if (strncmp (attrvalue, "value", 5) != 0)
+ cout << "FAIL: collection attribute value different" << endl;
+ memset (attrvalue, 0, sizeof (attrvalue));
+ if (os->collection_getattr (cid, "beta", attrvalue, sizeof (attrvalue)) < 0)
+ cout << "FAIL: collection_getattr" << endl;
+ else if (strncmp (attrvalue, "value", 5) != 0)
+ cout << "FAIL: collection attribute value different" << endl;
+ memset (attrvalue, 0, sizeof (attrvalue));
+ if (os->collection_getattr (cid, "gamma", attrvalue, sizeof (attrvalue)) < 0)
+ cout << "FAIL: collection_getattr" << endl;
+ else if (strncmp (attrvalue, "value", 5) != 0)
+ cout << "FAIL: collection attribute value different" << endl;
+ memset (attrvalue, 0, sizeof (attrvalue));
+ if (os->collection_getattr (cid, "fred", attrvalue, sizeof (attrvalue)) < 0)
+ cout << "FAIL: collection_getattr" << endl;
+ else if (strncmp (attrvalue, "value", 5) != 0)
+ cout << "FAIL: collection attribute value different" << endl;
+
+ if (os->collection_setattr (cid, "alpha", "eulavvalue", 10) != 0)
+ cout << "FAIL: collection setattr overwrite" << endl;
+ memset (attrvalue, 0, sizeof (attrvalue));
+ if (os->collection_getattr (cid, "alpha", attrvalue, sizeof (attrvalue)) < 0)
+ cout << "FAIL: collection_getattr" << endl;
+ else if (strncmp (attrvalue, "eulavvalue", 10) != 0)
+ cout << "FAIL: collection attribute value different" << endl;
+ memset (attrvalue, 0, sizeof (attrvalue));
+ if (os->collection_getattr (cid, "beta", attrvalue, sizeof (attrvalue)) < 0)
+ cout << "FAIL: collection_getattr" << endl;
+ else if (strncmp (attrvalue, "value", 5) != 0)
+ cout << "FAIL: collection attribute value different" << endl;
+ memset (attrvalue, 0, sizeof (attrvalue));
+ if (os->collection_getattr (cid, "gamma", attrvalue, sizeof (attrvalue)) < 0)
+ cout << "FAIL: collection_getattr" << endl;
+ else if (strncmp (attrvalue, "value", 5) != 0)
+ cout << "FAIL: collection attribute value different" << endl;
+ memset (attrvalue, 0, sizeof (attrvalue));
+ if (os->collection_getattr (cid, "fred", attrvalue, sizeof (attrvalue)) < 0)
+ cout << "FAIL: collection_getattr" << endl;
+ else if (strncmp (attrvalue, "value", 5) != 0)
+ cout << "FAIL: collection attribute value different" << endl;
+
+ if (os->collection_rmattr (cid, "alpha") != 0)
+ cout << "FAIL: collection_rmattr" << endl;
+ if (os->collection_rmattr (cid, "fred") != 0)
+ cout << "FAIL: collection_rmattr" << endl;
+ if (os->collection_rmattr (cid, "beta") != 0)
+ cout << "FAIL: collection_rmattr" << endl;
+ if (os->collection_rmattr (cid, "gamma") != 0)
+ cout << "FAIL: collection_rmattr" << endl;
+
+ if (os->collection_rmattr (cid, "alpha") == 0)
+ cout << "FAIL: collection_rmattr (nonexistent)" << endl;
+
+ // Truncate the object.
+ if (os->truncate (oid, 512, NULL) != 0)
+ {
+ cout << "FAIL: truncate" << endl;
+ }
+
+ // Expand the object.
+ if (os->truncate (oid, 1200, NULL) != 0)
+ {
+ cout << "FAIL: expand" << endl;
+ }
+
+ // Delete the object.
+ if (os->remove (oid) != 0)
+ {
+ cout << "FAIL: could not remove object" << endl;
+ }
+
+ // Shouldn't be there
+ if (os->exists (oid))
+ {
+ cout << "FAIL: should not be there" << endl;
+ }
+
+ os->sync();
+ exit (0);
+}
--- /dev/null
+# some valgrind suppressions
+# to load these automagically,
+# cat > ~/.valgrindrc
+# --suppressions=valgrind.supp
+# <control-d>
+
+
+# this one makes valgrind shut up about what appears to be a bug in libc's writev.
+{
+ writev uninit bytes thing -sage
+ Memcheck:Param
+ writev(vector[...])
+ fun:writev
+ fun:_ZN11BlockDevice6_writeEijjRN6buffer4listE
+ fun:_ZN11BlockDevice5do_ioEiRSt4listIPNS_6biovecESaIS2_EE
+ fun:_ZN11BlockDevice15io_thread_entryEv
+ fun:_ZN11BlockDevice8IOThread5entryEv
+ fun:_ZN6Thread11_entry_funcEPv
+ fun:start_thread
+ fun:clone
+ obj:*
+ obj:*
+ obj:*
+ obj:*
+}