void IOContext::aio_wait()
{
Mutex::Locker l(lock);
- _aio_wait();
-}
-
-void IOContext::_aio_wait()
-{
// see _aio_thread for waker logic
- while (num_running > 0 || num_reading > 0) {
+ num_waiting.inc();
+ while (num_running.read() > 0 || num_reading.read() > 0) {
dout(10) << __func__ << " " << this
- << " waiting for " << num_running << " aios and/or "
- << num_reading << " readers to complete" << dendl;
+ << " waiting for " << num_running.read() << " aios and/or "
+ << num_reading.read() << " readers to complete" << dendl;
cond.Wait(lock);
}
+ num_waiting.dec();
dout(20) << __func__ << " " << this << " done" << dendl;
}
// ----------------
+#undef dout_prefix
+#define dout_prefix *_dout << "bdev(" << path << ") "
BlockDevice::BlockDevice(aio_callback_t cb, void *cbpriv)
: fd(-1),
size(0), block_size(0),
fs(NULL), aio(false), dio(false),
+ debug_lock("BlockDevice::debug_lock"),
aio_queue(g_conf->bluestore_aio_max_queue_depth),
aio_callback(cb),
aio_callback_priv(cbpriv),
dout(30) << __func__ << " got " << r << " completed aios" << dendl;
for (int i = 0; i < r; ++i) {
IOContext *ioc = static_cast<IOContext*>(aio[i]->priv);
- Mutex::Locker l(ioc->lock);
- --ioc->num_running;
+ _aio_log_finish(ioc, aio[i]->offset, aio[i]->length);
+ int left = ioc->num_running.dec();
int r = aio[i]->get_return_value();
dout(10) << __func__ << " finished aio " << aio[i] << " r " << r
<< " ioc " << ioc
- << " with " << ioc->num_running << " aios left" << dendl;
+ << " with " << left << " aios left" << dendl;
assert(r >= 0);
- _aio_finish(ioc, aio[i]->offset, aio[i]->length);
- if (ioc->num_running == 0) {
- ioc->running_bl.clear();
- ioc->running_aios.clear();
+ if (left == 0) {
if (ioc->priv) {
aio_callback(aio_callback_priv, ioc->priv);
}
}
+ if (ioc->num_waiting.read()) {
+ dout(20) << __func__ << " waking waiter" << dendl;
+ Mutex::Locker l(ioc->lock);
+ ioc->cond.Signal();
+ }
}
}
}
dout(10) << __func__ << " end" << dendl;
}
-void BlockDevice::_aio_prepare(IOContext *ioc, uint64_t offset, uint64_t length)
+void BlockDevice::_aio_log_start(
+ IOContext *ioc,
+ uint64_t offset,
+ uint64_t length)
{
- dout(20) << __func__ << " " << offset << "~" << length
- << " (" << ioc->blocks << ")" << dendl;
- while (ioc->blocks.intersects(offset, length)) {
- dout(20) << __func__ << " waiting for overlapping io on "
- << offset << "~" << length
- << " (" << ioc->blocks << ")" << dendl;
- if (ioc->num_pending) {
- aio_submit(ioc);
- }
- ioc->_aio_wait();
- dout(20) << __func__ << " done waiting" << dendl;
- }
- ioc->blocks.insert(offset, length);
-
+ dout(20) << __func__ << " " << offset << "~" << length << dendl;
if (g_conf->bdev_debug_inflight_ios) {
+ Mutex::Locker l(debug_lock);
if (debug_inflight.intersects(offset, length)) {
derr << __func__ << " inflight overlap of "
<< offset << "~" << length
}
}
-void BlockDevice::_aio_finish(IOContext *ioc, uint64_t offset, uint64_t length)
+void BlockDevice::_aio_log_finish(
+ IOContext *ioc,
+ uint64_t offset,
+ uint64_t length)
{
- assert(ioc->lock.is_locked());
- dout(20) << __func__ << " " << aio << " " << offset << "~" << length
- << " (" << ioc->blocks << ")"
- << dendl;
- ioc->blocks.erase(offset, length);
- ioc->cond.Signal();
-
+ dout(20) << __func__ << " " << aio << " " << offset << "~" << length << dendl;
if (g_conf->bdev_debug_inflight_ios) {
+ Mutex::Locker l(debug_lock);
debug_inflight.erase(offset, length);
}
}
void BlockDevice::aio_submit(IOContext *ioc)
{
- Mutex::Locker l(ioc->lock);
dout(20) << __func__ << " ioc " << ioc
- << " pending " << ioc->num_pending
- << " running " << ioc->num_running
+ << " pending " << ioc->num_pending.read()
+ << " running " << ioc->num_running.read()
<< dendl;
-
-#warning fixme can we make this avoid a mutex?
// move these aside, and get our end iterator position now, as the
// aios might complete as soon as they are submitted and queue more
// wal aio's.
list<FS::aio_t>::iterator e = ioc->running_aios.begin();
ioc->running_aios.splice(e, ioc->pending_aios);
list<FS::aio_t>::iterator p = ioc->running_aios.begin();
- ioc->num_running += ioc->num_pending;
- ioc->num_pending = 0;
- ioc->running_bl.claim_append(ioc->pending_bl);
+
+ int pending = ioc->num_pending.read();
+ ioc->num_running.add(pending);
+ ioc->num_pending.sub(pending);
+ assert(ioc->num_pending.read() == 0); // we should be only thread doing this
+
bool done = false;
while (!done) {
FS::aio_t& aio = *p;
aio.priv = static_cast<void*>(ioc);
- dout(20) << __func__ << " aio " << &aio << " fd " << aio.fd
+ dout(20) << __func__ << " aio " << &aio << " fd " << aio.fd
<< " " << aio.offset << "~" << aio.length << dendl;
for (vector<iovec>::iterator q = aio.iov.begin(); q != aio.iov.end(); ++q)
- dout(30) << __func__ << " iov " << (void*)q->iov_base
+ dout(30) << __func__ << " iov " << (void*)q->iov_base
<< " len " << q->iov_len << dendl;
// be careful: as soon as we submit aio we race with completion.
bl.hexdump(*_dout);
*_dout << dendl;
- {
- Mutex::Locker l(ioc->lock);
- _aio_prepare(ioc, off, bl.length());
- }
+ _aio_log_start(ioc, off, bl.length());
#ifdef HAVE_LIBAIO
if (aio && dio) {
ioc->pending_aios.push_back(FS::aio_t(ioc, fd));
- ++ioc->num_pending;
+ ioc->num_pending.inc();
FS::aio_t& aio = ioc->pending_aios.back();
bl.prepare_iov(&aio.iov);
for (unsigned i=0; i<aio.iov.size(); ++i) {
dout(30) << "aio " << i << " " << aio.iov[i].iov_base
<< " " << aio.iov[i].iov_len << dendl;
}
- ioc->pending_bl.append(bl);
+ aio.bl.claim_append(bl);
aio.pwritev(off);
dout(2) << __func__ << " prepared aio " << &aio << dendl;
} else
assert(off < size);
assert(off + len <= size);
- {
- Mutex::Locker l(ioc->lock);
- _aio_prepare(ioc, off, len);
- ++ioc->num_reading;
- }
+ _aio_log_start(ioc, off, len);
+ ioc->num_reading.inc();;
bufferptr p = buffer::create_page_aligned(len);
int r = ::pread(fd, p.c_str(), len, off);
}
pbl->clear();
pbl->push_back(p);
+
dout(40) << "data: ";
pbl->hexdump(*_dout);
*_dout << dendl;
+
out:
- Mutex::Locker l(ioc->lock);
- --ioc->num_reading;
- _aio_finish(ioc, off, len);
+ _aio_log_finish(ioc, off, len);
+ ioc->num_reading.dec();
+ if (ioc->num_waiting.read()) {
+ dout(20) << __func__ << " waking waiter" << dendl;
+ Mutex::Locker l(ioc->lock);
+ ioc->cond.Signal();
+ }
return r < 0 ? r : 0;
}
Mutex lock;
Cond cond;
- interval_set<uint64_t> blocks; ///< blocks with aio in flight
+ //interval_set<uint64_t> blocks; ///< blocks with aio in flight
list<FS::aio_t> pending_aios; ///< not yet submitted
list<FS::aio_t> running_aios; ///< submitting or submitted
- int num_pending;
- int num_running;
- int num_reading;
-
- bufferlist pending_bl; // just a pile of refs
- bufferlist running_bl; // just a pile of refs
+ atomic_t num_pending;
+ atomic_t num_running;
+ atomic_t num_reading;
+ atomic_t num_waiting;
IOContext(void *p)
: priv(p),
- lock("IOContext::lock"),
- num_pending(0),
- num_running(0),
- num_reading(0) {}
+ lock("IOContext::lock")
+ {}
+
+ // no copying
+ IOContext(const IOContext& other);
+ IOContext &operator=(const IOContext& other);
bool has_aios() {
Mutex::Locker l(lock);
- return num_pending + num_running;
+ return num_pending.read() + num_running.read();
}
void aio_wait();
- void _aio_wait();
};
class BlockDevice {
bool aio, dio;
bufferptr zeros;
+ Mutex debug_lock;
interval_set<uint64_t> debug_inflight;
FS::aio_queue_t aio_queue;
int _aio_start();
void _aio_stop();
- void _aio_prepare(IOContext *ioc, uint64_t offset, uint64_t length);
- void _aio_finish(IOContext *ioc, uint64_t offset, uint64_t length);
+ void _aio_log_start(IOContext *ioc, uint64_t offset, uint64_t length);
+ void _aio_log_finish(IOContext *ioc, uint64_t offset, uint64_t length);
int _lock();
}
}
+/*static void aio_cb(void *priv, void *priv2)
+{
+ BlueFS *fs = static_cast<BlueFS*>(priv);
+ if (priv2)
+ fs->_aio_finish(priv2);
+ }*/
+
int BlueFS::add_block_device(unsigned id, string path)
{
dout(10) << __func__ << " bdev " << id << " path " << path << dendl;
assert(id == bdev.size());
- BlockDevice *b = new BlockDevice(NULL, NULL); // no aio callback; use ioc
+ BlockDevice *b = new BlockDevice(NULL, NULL); //aio_cb, this);
int r = b->open(path);
if (r < 0) {
delete b;
FileRef log_file = new File;
log_file->fnode.ino = 1;
_allocate(0, g_conf->bluefs_max_log_runway, &log_file->fnode.extents);
- log_writer = new FileWriter(log_file);
+ log_writer = new FileWriter(log_file, bdev.size());
// initial txn
log_t.op_init();
super.block_size = bdev[0]->get_block_size();
super.log_fnode = log_file->fnode;
_write_super();
+ _flush_bdev();
super.version = 1;
_write_super();
_flush_bdev();
}
// init freelist
- for (auto p : file_map) {
+ for (auto& p : file_map) {
dout(30) << __func__ << " noting alloc for " << p.second->fnode << dendl;
- for (auto q : p.second->fnode.extents) {
+ for (auto& q : p.second->fnode.extents) {
alloc[q.bdev]->init_rm_free(q.offset, q.length);
}
}
// set up the log for future writes
- log_writer = new FileWriter(_get_file(1));
+ log_writer = new FileWriter(_get_file(1), bdev.size());
assert(log_writer->file->fnode.ino == 1);
log_writer->pos = log_writer->file->fnode.size;
dout(10) << __func__ << " log write pos set to " << log_writer->pos << dendl;
alloc.clear();
block_all.clear();
file_map.clear();
- for (auto p : dir_map) {
+ for (auto& p : dir_map) {
delete p.second;
}
super = bluefs_super_t();
uint64_t off = (super.version & 1) ?
super.super_b_offset : super.super_a_offset;
- bdev[0]->aio_write(off, bl, ioc[0]);
- _submit_bdev();
+ IOContext ioc(NULL);
+ bdev[0]->aio_write(off, bl, &ioc);
+ bdev[0]->aio_submit(&ioc);
+ ioc.aio_wait();
dout(20) << __func__ << " v " << super.version << " crc " << crc
<< " offset " << off << dendl;
return 0;
dout(20) << __func__ << " destroying " << file->fnode << dendl;
assert(file->num_reading.read() == 0);
log_t.op_file_remove(file->fnode.ino);
- for (auto r : file->fnode.extents) {
+ for (auto& r : file->fnode.extents) {
alloc[r.bdev]->release(r.offset, r.length);
}
file_map.erase(file->fnode.ino);
int avg_file_size = 12;
uint64_t size = 4096 * 2;
size += file_map.size() * (1 + sizeof(bluefs_fnode_t));
- for (auto p : block_all)
+ for (auto& p : block_all)
size += p.num_intervals() * (1 + 1 + sizeof(uint64_t) * 2);
size += dir_map.size() + (1 + avg_dir_size);
size += file_map.size() * (1 + avg_dir_size + avg_file_size);
t.op_alloc_add(bdev, q.get_start(), q.get_len());
}
}
- for (auto p : file_map) {
+ for (auto& p : file_map) {
if (p.first == 1)
continue;
dout(20) << __func__ << " op_file_update " << p.second->fnode << dendl;
t.op_file_update(p.second->fnode);
}
- for (auto p : dir_map) {
+ for (auto& p : dir_map) {
dout(20) << __func__ << " op_dir_create " << p.first << dendl;
t.op_dir_create(p.first);
- for (auto q : p.second->file_map) {
+ for (auto& q : p.second->file_map) {
dout(20) << __func__ << " op_dir_link " << p.first << "/" << q.first
<< " to " << q.second->fnode.ino << dendl;
t.op_dir_link(p.first, q.first, q.second->fnode.ino);
delete log_writer;
log_file->fnode.size = bl.length();
- log_writer = new FileWriter(log_file);
+ log_writer = new FileWriter(log_file, bdev.size());
log_writer->append(bl);
_flush(log_writer);
_flush_bdev();
dout(10) << __func__ << " release old log extents " << old_extents << dendl;
- for (auto r : old_extents) {
+ for (auto& r : old_extents) {
alloc[r.bdev]->release(r.offset, r.length);
}
}
x_off -= partial;
offset -= partial;
length += partial;
+ dout(20) << __func__ << " waiting for previous aio to complete" << dendl;
+ for (auto p : h->iocv) {
+ p->aio_wait();
+ }
}
if (length == partial + h->buffer.length()) {
bl.claim_append(h->buffer);
z.zero();
t.append(z);
}
- bdev[0]->aio_write(p->offset + x_off, t, ioc[0]);
+ bdev[0]->aio_write(p->offset + x_off, t, h->iocv[0]);
bloff += x_len;
length -= x_len;
++p;
x_off = 0;
}
- _submit_bdev();
+ for (unsigned i = 0; i < bdev.size(); ++i) {
+ if (!h->iocv[i]->pending_aios.empty()) {
+ bdev[i]->aio_submit(h->iocv[i]);
+ }
+ }
dout(20) << __func__ << " h " << h << " pos now " << h->pos << dendl;
-
return 0;
}
+/*
+void BlueFS::_aio_finish(void *priv)
+{
+ FileWriter *h = static_cast<FileWriter*>(priv);
+ Mutex::Locker l(h->lock);
+ dout(10) << __func__ << " h " << h << " on " << h->file->fnode << dendl;
+ if (--h->num_aio_in_flight == 0) {
+ h->cond.Signal();
+ }
+}
+*/
+
int BlueFS::_flush(FileWriter *h)
{
uint64_t length = h->buffer.length();
}
}
-void BlueFS::_submit_bdev()
-{
- dout(20) << __func__ << dendl;
- for (unsigned i = 0; i < bdev.size(); ++i) {
- bdev[i]->aio_submit(ioc[i]);
- }
-}
-
void BlueFS::_flush_bdev()
{
dout(20) << __func__ << dendl;
- for (auto p : ioc) {
- p->aio_wait();
- }
for (auto p : bdev) {
p->flush();
}
log_t.op_file_update(file->fnode);
}
- *h = new FileWriter(file);
+ *h = new FileWriter(file, bdev.size());
dout(10) << __func__ << " h " << *h << " on " << file->fnode << dendl;
return 0;
}
if (dirname.size() == 0) {
// list dirs
ls->reserve(dir_map.size() + 2);
- for (auto q : dir_map) {
+ for (auto& q : dir_map) {
ls->push_back(q.first);
}
} else {
}
Dir *dir = p->second;
ls->reserve(dir->file_map.size() + 2);
- for (auto q : dir->file_map) {
+ for (auto& q : dir->file_map) {
ls->push_back(q.first);
}
}
bufferlist buffer; ///< new data to write (at end of file)
bufferlist tail_block; ///< existing partial block at end of file, if any
- FileWriter(FileRef f) : file(f), pos(0) {
+ Mutex lock;
+ /*Cond cond;
+ bool num_aio_in_flight;
+ */
+ vector<IOContext*> iocv; ///< one for each bdev
+
+ FileWriter(FileRef f, unsigned num_bdev)
+ : file(f),
+ pos(0),
+ lock("BlueFS::FileWriter::lock") { //,
+ //num_aio_in_flight(0) {
file->num_writers.inc();
+ iocv.resize(num_bdev);
+ for (unsigned i = 0; i < num_bdev; ++i) {
+ iocv[i] = new IOContext(NULL);
+ }
}
~FileWriter() {
file->num_writers.dec();
void _maybe_compact_log();
void _compact_log();
- void _submit_bdev();
+ //void _aio_finish(void *priv);
+
void _flush_bdev();
int _preallocate(FileRef f, uint64_t off, uint64_t len);
// no need to hold the global lock here; we only touch h and
// h->file, and read vs write or delete is already protected (via
// atomics and asserts).
+ Mutex::Locker l(lock);
return _read(h, buf, offset, len, outbl, out);
}
void invalidate_cache(FileRef f, uint64_t offset, uint64_t len) {
#define dout_prefix *_dout << "bluestore(" << path << ") "
-void aio_cb(void *priv, void *priv2)
+static void aio_cb(void *priv, void *priv2)
{
BlueStore *store = static_cast<BlueStore*>(priv);
store->_txc_aio_finish(priv2);
alloc = Allocator::create("stupid");
uint64_t num = 0, bytes = 0;
const map<uint64_t,uint64_t>& fl = fm->get_freelist();
- for (auto p : fl) {
+ for (auto& p : fl) {
alloc->init_add_free(p.first, p.second);
++num;
bytes += p.second;
const vector<extent_t>& bluefs_gift_extents)
{
dout(10) << __func__ << dendl;
- for (auto p : bluefs_gift_extents) {
+ for (auto& p : bluefs_gift_extents) {
bluefs->add_block_extent(0, p.offset, p.length);
}
}
if (ols.empty()) {
break;
}
- for (auto oid : ols) {
+ for (auto& oid : ols) {
dout(10) << __func__ << " " << oid << dendl;
OnodeRef o = c->get_onode(oid, false);
if (!o || !o->exists) {
used_nids.insert(o->onode.nid);
}
// blocks
- for (auto b : o->onode.block_map) {
+ for (auto& b : o->onode.block_map) {
if (used_blocks.contains(b.second.offset, b.second.length)) {
derr << " " << oid << " extent " << b.first << ": " << b.second
<< " already allocated" << dendl;
// overlays
set<string> overlay_keys;
map<uint64_t,int> refs;
- for (auto v : o->onode.overlay_map) {
+ for (auto& v : o->onode.overlay_map) {
if (v.first + v.second.length > o->onode.size) {
derr << " " << oid << " overlay " << v.first << " " << v.second
<< " extends past end of object" << dendl;
++errors;
}
}
- for (auto vr : o->onode.overlay_refs) {
+ for (auto& vr : o->onode.overlay_refs) {
if (refs[vr.first] != vr.second) {
derr << " " << oid << " overlay key " << vr.first
<< " says " << vr.second << " refs but we have "
}
refs.erase(vr.first);
}
- for (auto p : refs) {
+ for (auto& p : refs) {
if (p.second > 1) {
derr << " " << oid << " overlay key " << p.first
<< " has " << p.second << " refs but they are not recorded"
int r = _balance_bluefs_freespace(&bluefs_gift_extents);
assert(r >= 0);
if (r > 0) {
- for (auto p : bluefs_gift_extents) {
+ for (auto& p : bluefs_gift_extents) {
fm->allocate(p.offset, p.length, t);
bluefs_extents.insert(p.offset, p.length);
}
vector<string> dirs;
r = fs.readdir("", &dirs);
assert(r == 0);
- for (auto dir : dirs) {
+ for (auto& dir : dirs) {
if (dir[0] == '.')
continue;
cout << dir << "/" << std::endl;
string cmd = "mkdir -p " + outdir + "/" + dir;
r = system(cmd.c_str());
assert(r == 0);
- for (auto file : ls) {
+ for (auto& file : ls) {
if (file[0] == '.')
continue;
cout << dir << "/" << file << std::endl;
f->dump_unsigned("size", size);
f->dump_stream("mtime") << mtime;
f->open_array_section("extents");
- for (auto p : extents)
+ for (auto& p : extents)
f->dump_object("extent", p);
f->close_section();
}
uint64_t get_allocated() const {
uint64_t r = 0;
- for (auto p : extents)
+ for (auto& p : extents)
r += p.length;
return r;
}
vector<iovec> iov;
uint64_t offset, length;
int rval;
+ bufferlist bl; ///< write payload (so that it remains stable for duration)
aio_t(void *p, int f) : priv(p), fd(f), rval(-1000) {
memset(&iocb, 0, sizeof(iocb));
BlueFS::FileReader *h;
ASSERT_EQ(0, fs.open_for_read("dir", "file", &h));
bufferlist bl;
- ASSERT_EQ(9, fs.read(h, 0, 1024, &bl, NULL));
+ BlueFS::FileReaderBuffer buf(4096);
+ ASSERT_EQ(9, fs.read(h, &buf, 0, 1024, &bl, NULL));
ASSERT_EQ(0, strncmp("foobarbaz", bl.c_str(), 9));
delete h;
}