#ifndef CEPH_OS_FS_H
#define CEPH_OS_FS_H
+#include <errno.h>
+#include <time.h>
+
+#include "acconfig.h"
+#ifdef HAVE_LIBAIO
+# include <libaio.h>
+#endif
+
#include <string>
#include "include/types.h"
+#include "common/Mutex.h"
+#include "common/Cond.h"
class FS {
public:
int from_fd,
uint64_t from_offset, uint64_t from_len);
virtual int zero(int fd, uint64_t offset, uint64_t length);
+
+ // -- aio --
+
+ struct aio_t {
+ struct iocb iocb; // must be first element; see shenanigans in aio_queue_t
+ void *priv;
+ int fd;
+ vector<iovec> iov;
+
+ aio_t(void *p, int f) : priv(p), fd(f) {
+ memset(&iocb, 0, sizeof(iocb));
+ }
+
+ void pwritev(uint64_t offset) {
+ io_prep_pwritev(&iocb, fd, &iov[0], iov.size(), offset);
+ }
+ };
+
+ struct aio_queue_t {
+ int max_iodepth;
+ io_context_t ctx;
+
+ aio_queue_t(unsigned max_iodepth = 8)
+ : max_iodepth(max_iodepth),
+ ctx(0) {
+ }
+ ~aio_queue_t() {
+ assert(ctx == 0);
+ }
+
+ int init() {
+ assert(ctx == 0);
+ return io_setup(max_iodepth, &ctx);
+ }
+ void shutdown() {
+ if (ctx) {
+ int r = io_destroy(ctx);
+ assert(r == 0);
+ ctx = 0;
+ }
+ }
+
+ int submit(aio_t &aio) {
+ int attempts = 10;
+ iocb *piocb = &aio.iocb;
+ do {
+ int r = io_submit(ctx, 1, &piocb);
+ if (r < 0) {
+ if (r == -EAGAIN && attempts-- > 0) {
+ usleep(500);
+ continue;
+ }
+ return r;
+ }
+ } while (false);
+ return 0;
+ }
+
+ int get_next_completed(int timeout_ms, aio_t **paio) {
+ io_event event[1];
+ struct timespec t = {
+ timeout_ms / 1000,
+ (timeout_ms % 1000) * 1000 * 1000
+ };
+ int r = io_getevents(ctx, 1, 1, event, &t);
+ if (r <= 0) {
+ return r;
+ }
+ *paio = (aio_t *)event[0].obj;
+ return 1;
+ }
+ };
+
};
#endif
cct->_conf->newstore_fsync_thread_timeout,
cct->_conf->newstore_fsync_thread_suicide_timeout,
&fsync_tp),
+ aio_thread(this),
+ aio_stop(false),
+ aio_queue(cct->_conf->newstore_aio_max_queue_depth),
kv_sync_thread(this),
kv_lock("NewStore::kv_lock"),
kv_stop(false),
db = NULL;
}
+int NewStore::_aio_start()
+{
+ if (g_conf->newstore_aio) {
+ dout(10) << __func__ << dendl;
+ int r = aio_queue.init();
+ if (r < 0)
+ return r;
+ aio_thread.create();
+ }
+ return 0;
+}
+
+void NewStore::_aio_stop()
+{
+ if (g_conf->newstore_aio) {
+ dout(10) << __func__ << dendl;
+ aio_stop = true;
+ aio_thread.join();
+ aio_stop = false;
+ aio_queue.shutdown();
+ }
+}
+
int NewStore::_open_collections()
{
KeyValueDB::Iterator it = db->get_iterator(PREFIX_COLL);
if (r < 0)
goto out_db;
- r = _replay_wal();
+ r = _aio_start();
if (r < 0)
goto out_db;
+ r = _replay_wal();
+ if (r < 0)
+ goto out_aio;
+
finisher.start();
fsync_tp.start();
wal_tp.start();
mounted = true;
return 0;
+ out_aio:
+ _aio_stop();
out_db:
_close_db();
out_frag:
dout(20) << __func__ << " stopping fsync_wq" << dendl;
fsync_tp.stop();
+ dout(20) << __func__ << " stopping aio" << dendl;
+ _aio_stop();
dout(20) << __func__ << " stopping kv thread" << dendl;
_kv_stop();
dout(20) << __func__ << " draining wal_wq" << dendl;
}
}
+void NewStore::_aio_thread()
+{
+ dout(10) << __func__ << " start" << dendl;
+ while (!aio_stop) {
+ dout(40) << __func__ << " polling" << dendl;
+ FS::aio_t *aio;
+ int r = aio_queue.get_next_completed(g_conf->newstore_aio_poll_ms, &aio);
+ if (r < 0) {
+ derr << __func__ << " got " << cpp_strerror(r) << dendl;
+ }
+ if (r == 1) {
+ TransContext *txc = static_cast<TransContext*>(aio->priv);
+ int left = txc->num_aio.dec();
+ dout(10) << __func__ << " finished aio on " << txc << ", "
+ << left << " left" << dendl;
+ if (left == 0) {
+ txc->state = TransContext::STATE_AIO_DONE;
+ if (!txc->fds.empty()) {
+ _txc_queue_fsync(txc);
+ } else {
+ _txc_finish_fsync(txc);
+ }
+ }
+ }
+ }
+ dout(10) << __func__ << " end" << dendl;
+}
+
void NewStore::_kv_sync_thread()
{
dout(10) << __func__ << " start" << dendl;
<< cpp_strerror(r) << dendl;
return r;
}
- p->data.write_fd(fd);
+ r = p->data.write_fd(fd);
+ if (r < 0) {
+ derr << __func__ << " write_fd on " << fd << " got: "
+ << cpp_strerror(r) << dendl;
+ return r;
+ }
sync_fds.push_back(fd);
}
break;
_txc_finish_kv(txc);
} else {
// async path
- if (!txc->fds.empty()) {
+ if (!txc->aios.empty()) {
+ txc->state = TransContext::STATE_AIO_QUEUED;
+ dout(20) << __func__ << " submitting " << txc->num_aio.read() << " aios"
+ << dendl;
+ for (list<FS::aio_t>::iterator p = txc->aios.begin();
+ p != txc->aios.end();
+ ++p) {
+ FS::aio_t& aio = *p;
+ dout(20) << __func__ << " submitting aio " << &aio << dendl;
+ for (vector<iovec>::iterator q = aio.iov.begin(); q != aio.iov.end(); ++q)
+ dout(30) << __func__ << " iov " << (void*)q->iov_base
+ << " len " << q->iov_len << dendl;
+ dout(30) << " fd " << aio.fd << " offset " << lseek64(aio.fd, 0, SEEK_CUR)
+ << dendl;
+ int r = aio_queue.submit(*p);
+ if (r) {
+ derr << " aio submit got " << cpp_strerror(r) << dendl;
+ assert(r == 0);
+ }
+ }
+ } else if (!txc->fds.empty()) {
_txc_queue_fsync(txc);
} else {
_txc_finish_fsync(txc);
o->onode.size == 0 ||
o->onode.data_map.empty()) {
_do_overlay_clear(txc, o);
+ uint64_t x_offset;
if (o->onode.data_map.empty()) {
// create
fragment_t &f = o->onode.data_map[0];
r = fd;
goto out;
}
- ::lseek64(fd, offset, SEEK_SET);
+ x_offset = offset;
dout(20) << __func__ << " create " << f.fid << " writing "
<< offset << "~" << length << dendl;
} else {
}
::ftruncate(fd, f.length); // in case there is trailing crap
f.length = (offset + length) - f.offset;
- ::lseek64(fd, offset - f.offset, SEEK_SET);
+ x_offset = offset - f.offset;
dout(20) << __func__ << " append " << f.fid << " writing "
<< (offset - f.offset) << "~" << length << dendl;
}
if (offset + length > o->onode.size) {
o->onode.size = offset + length;
}
- r = bl.write_fd(fd);
- if (r < 0) {
- derr << __func__ << " bl.write_fd error: " << cpp_strerror(r) << dendl;
- goto out;
+#ifdef HAVE_LIBAIO
+ if (g_conf->newstore_aio && (flags & O_DIRECT)) {
+ txc->aios.push_back(FS::aio_t(txc, fd));
+ txc->num_aio.inc();
+ FS::aio_t& aio = txc->aios.back();
+ bl.prepare_iov(&aio.iov);
+ txc->aio_bl.append(bl);
+ aio.pwritev(x_offset);
+
+ dout(2) << __func__ << " prepared aio " << &aio << dendl;
+ } else
+#endif
+ {
+ ::lseek64(fd, x_offset, SEEK_SET);
+ r = bl.write_fd(fd);
+ if (r < 0) {
+ derr << __func__ << " bl.write_fd error: " << cpp_strerror(r) << dendl;
+ goto out;
+ }
}
txc->sync_fd(fd);
r = 0;
dout(20) << __func__ << " replace old fid " << op->fid
<< " with new fid " << f.fid
<< ", writing " << offset << "~" << length << dendl;
- r = bl.write_fd(fd);
- if (r < 0) {
- derr << __func__ << " bl.write_fd error: " << cpp_strerror(r) << dendl;
- goto out;
+
+#ifdef HAVE_LIBAIO
+ if (g_conf->newstore_aio && (flags & O_DIRECT)) {
+ txc->aios.push_back(FS::aio_t(txc, fd));
+ txc->num_aio.inc();
+ FS::aio_t& aio = txc->aios.back();
+ bl.prepare_iov(&aio.iov);
+ txc->aio_bl.append(bl);
+ aio.pwritev(0);
+ dout(2) << __func__ << " prepared aio " << &aio << dendl;
+ } else
+#endif
+ {
+ r = bl.write_fd(fd);
+ if (r < 0) {
+ derr << __func__ << " bl.write_fd error: " << cpp_strerror(r) << dendl;
+ goto out;
+ }
+ txc->sync_fd(fd);
}
- txc->sync_fd(fd);
r = 0;
goto out;
}
#ifndef CEPH_OSD_NEWSTORE_H
#define CEPH_OSD_NEWSTORE_H
+#include "acconfig.h"
+
#include <unistd.h>
#include "include/assert.h"
struct TransContext {
typedef enum {
STATE_PREPARE,
+ STATE_AIO_QUEUED,
+ STATE_AIO_DONE,
STATE_FSYNC_QUEUED,
STATE_FSYNC_FSYNCING,
STATE_FSYNC_DONE,
case STATE_FSYNC_QUEUED: return "fsync_queued";
case STATE_FSYNC_FSYNCING: return "fsync_fsyncing";
case STATE_FSYNC_DONE: return "fsync_done";
+ case STATE_AIO_QUEUED: return "aio_queued";
+ case STATE_AIO_DONE: return "aio_done";
case STATE_KV_QUEUED: return "kv_queued";
case STATE_KV_COMMITTING: return "kv_committing";
case STATE_KV_DONE: return "kv_done";
wal_transaction_t *wal_txn; ///< wal transaction (if any)
unsigned num_fsyncs_completed;
+ list<FS::aio_t> aios;
+ bufferlist aio_bl; // just a pile of refs
+ atomic_t num_aio;
+
Mutex lock;
Cond cond;
onreadable_sync(NULL),
wal_txn(NULL),
num_fsyncs_completed(0),
+ num_aio(0),
lock("NewStore::TransContext::lock") {
//cout << "txc new " << this << std::endl;
}
public:
WALWQ(NewStore *s, time_t ti, time_t sti, ThreadPool *tp)
: ThreadPool::WorkQueue<TransContext>("NewStore::WALWQ", ti, sti, tp),
- store(s) {
+ store(s),
+ ops(0),
+ bytes(0) {
}
bool _empty() {
return wal_queue.empty();
}
};
+ struct AioCompletionThread : public Thread {
+ NewStore *store;
+ AioCompletionThread(NewStore *s) : store(s) {}
+ void *entry() {
+ store->_aio_thread();
+ return NULL;
+ }
+ };
+
// --------------------------------------------------------
// members
private:
ThreadPool fsync_tp;
FsyncWQ fsync_wq;
+ AioCompletionThread aio_thread;
+ bool aio_stop;
+ FS::aio_queue_t aio_queue;
+
KVSyncThread kv_sync_thread;
Mutex kv_lock;
Cond kv_cond, kv_sync_cond;
void _osr_reap_done(OpSequencer *osr);
+ void _aio_thread();
+ int _aio_start();
+ void _aio_stop();
+
void _kv_sync_thread();
void _kv_stop() {
{