osd_fakestore_syncthreads: 4,
osd_ebofs: 0,
+ // --- ebofs ---
ebofs_commit_interval: 2, // seconds. 0 = no timeout (for debugging/tracing)
ebofs_bc_size: (50 *256), // measured in 4k blocks, or *256 for MB
- ebofs_bc_max_dirty: (40 *256), // before write() will wait for data to flush
+ ebofs_bc_max_dirty: (10 *256), // before write() will wait for data to flush
+ // --- block device ---
+ bdev_max_el_ms: 1000, // restart elevator at least once every 1000 ms
+
// --- fakeclient (mds regression testing) (ancient history) ---
num_fakeclient: 100,
int osd_fakestore_syncthreads; // such crap
int osd_ebofs;
+ // ebofs
int ebofs_commit_interval;
off_t ebofs_bc_size;
off_t ebofs_bc_max_dirty;
+ // block device
+ int bdev_max_el_ms;
+
+
// fake client
int num_fakeclient;
unsigned fakeclient_requests;
bool dommap;
+ off_t talloc;
+
public:
- AlignedBufferPool(int a) : alignment(a), dommap(true) {}
+ AlignedBufferPool(int a) : alignment(a), dommap(true), talloc(0) {}
~AlignedBufferPool() {
}
void free(char *p, unsigned len) {
- dout(1) << "bufferpool.free " << (void*)p << " len " << len << endl;
+ dout(20) << "bufferpool(" << (void*)this << ").free " << (void*)p << " len " << len << " ... total " << talloc << endl;
+ talloc -= len;
if (dommap)
munmap(p, len);
else
::posix_memalign((void**)&p, alignment, bytes);
assert(p);
+ talloc += bytes;
::memset(p, 0, bytes); // only to shut up valgrind
- dout(1) << "bufferpool.alloc " << (void*)p << endl;
+ dout(20) << "bufferpool(" << (void*)this << ").alloc " << (void*)p << " len " << bytes << " ... total " << talloc << endl;
return new buffer(p, bytes, BUFFER_MODE_NOCOPY|BUFFER_MODE_NOFREE|BUFFER_MODE_CUSTOMFREE,
bytes,
// queue?
if (!io_queue.empty()) {
+ utime_t stop = g_clock.now();
+ utime_t max(0, 1000*g_conf.bdev_max_el_ms); // (s,us), convert ms -> us!
+ stop += max;
+
if (dir_forward) {
// forward sweep
dout(20) << "io_thread forward sweep" << endl;
lock.Unlock();
do_io(biols);
lock.Lock();
+
+ utime_t now = g_clock.now();
+ if (now > stop) break;
}
} else {
// reverse sweep
lock.Unlock();
do_io(biols);
lock.Lock();
+
+ utime_t now = g_clock.now();
+ if (now > stop) break;
}
}
dir_forward = !dir_forward;
ebofs_lock.Lock();
assert(!mounted);
+ int r = dev.open();
+ if (r < 0) {
+ ebofs_lock.Unlock();
+ return r;
+ }
+
// read super
bufferptr bp1 = bufferpool.alloc(EBOFS_BLOCK_SIZE);
bufferptr bp2 = bufferpool.alloc(EBOFS_BLOCK_SIZE);
ebofs_lock.Lock();
assert(!mounted);
+ int r = dev.open();
+ if (r < 0) {
+ ebofs_lock.Unlock();
+ return r;
+ }
+
block_t num_blocks = dev.get_num_blocks();
free_blocks = 0;
dout(3) << "mkfs: cleaning up" << endl;
close_tables();
- dout(1) << "mkfs: done" << endl;
+ dev.close();
+ dout(1) << "mkfs: done" << endl;
ebofs_lock.Unlock();
return 0;
}
// free memory
dout(2) << "umount cleaning up" << endl;
close_tables();
+ dev.close();
dout(1) << "umount done" << endl;
ebofs_lock.Unlock();
// wait for kick, or timeout
if (g_conf.ebofs_commit_interval) {
- commit_cond.WaitInterval(ebofs_lock, utime_t(EBOFS_COMMIT_INTERVAL,0));
+ dout(10) << "commit_thread sleeping (up to) " << g_conf.ebofs_commit_interval << " seconds" << endl;
+ commit_cond.WaitInterval(ebofs_lock, utime_t(g_conf.ebofs_commit_interval,0));
} else {
// DEBUG.. wait until kicked
dout(10) << "commit_thread no commit_interval, waiting until kicked" << endl;
}
}
- dout(10) << "trim_bc finish: size " << bc.get_size() << ", trimmable " << bc.get_trimmable() << ", max " << max << endl;
+ dout(1) << "trim_bc finish: size " << bc.get_size() << ", trimmable " << bc.get_trimmable() << ", max " << max << endl;
/*
dout(10) << "trim_buffer_cache finish: "
}
-const int EBOFS_COMMIT_INTERVAL = 2; // 0 == never
-
class Ebofs : public ObjectStore {
protected:
Mutex ebofs_lock; // a beautiful global lock
// ** super **
- BlockDevice &dev;
+ BlockDevice dev;
bool mounted, unmounting;
bool readonly;
version_t super_epoch;
bool _write_will_block();
public:
- Ebofs(BlockDevice& d) :
- dev(d),
+ Ebofs(char *devfn) :
+ dev(devfn),
mounted(false), unmounting(false), readonly(false),
super_epoch(0), commit_thread_started(false), mid_commit(false),
commit_thread(this),
}
char *filename = args[0];
- // device
- BlockDevice dev(filename);
- if (dev.open() < 0) {
- cerr << "couldn't open " << filename << endl;
- return -1;
- }
-
// mkfs
- Ebofs mfs(dev);
- mfs.mkfs();
+ Ebofs mfs(filename);
+ int r = mfs.mkfs();
+ if (r < 0) exit(r);
if (1) {
// test-o-rama!
- Ebofs fs(dev);
+ Ebofs fs(filename);
fs.mount();
if (1) { // big writes
fs.umount();
}
- dev.close();
}
bdbout(1) << "buffer.malloc " << (void*)_dataptr << endl;
}
~buffer() {
- bdbout(1) << "buffer.des " << *this << endl;
+ bdbout(1) << "buffer.des " << *this << " " << (void*)free_func << endl;
if (free_func) {
bdbout(1) << "buffer.custom_free_func " << free_func_arg << " " << (void*)_dataptr << endl;
free_func( free_func_arg, _dataptr, _alloc_len );
#ifdef USE_EBOFS
# include "ebofs/Ebofs.h"
-# include "ebofs/BlockDevice.h"
#endif
store = new OBFSStore(whoami, NULL, "/dev/sdb3");
#else
# ifdef USE_EBOFS
- storedev = 0;
if (g_conf.osd_ebofs) {
char hostname[100];
hostname[0] = 0;
gethostname(hostname,100);
sprintf(ebofs_path, "%s/%s", ebofs_base_path, hostname);
- storedev = new BlockDevice(ebofs_path);
- store = new Ebofs(*storedev);
+ store = new Ebofs(ebofs_path);
} else
# endif
store = new FakeStore(osd_base_path, whoami);
if (messenger) { delete messenger; messenger = 0; }
if (logger) { delete logger; logger = 0; }
if (store) { delete store; store = 0; }
-#ifdef USE_EBOFS
- if (storedev) { delete storedev; storedev = 0; }
-#endif
}
{
osd_lock.Lock();
-#ifdef USE_EBOFS
- if (storedev)
- storedev->open();
-#endif
if (g_conf.osd_mkfs) store->mkfs();
int r = store->mount();
messenger->shutdown();
int r = store->umount();
-#ifdef USE_EBOFS
- if (storedev) storedev->close();
-#endif
return r;
}
int whoami;
class ObjectStore *store;
-#ifdef USE_EBOFS
- class BlockDevice *storedev; // for ebofs
-#endif
class HostMonitor *monitor;
class Logger *logger;