This never made sense. Move it. Fix users.
Signed-off-by: Sage Weil <sage@redhat.com>
kstore/KStore.cc
kstore/kstore_types.cc
fs/FS.cc
+ fs/aio.cc
${libos_xfs_srcs})
if(HAVE_LIBAIO)
#include <atomic>
#include <condition_variable>
#include <mutex>
+#include <list>
#include "acconfig.h"
-#include "os/fs/FS.h"
+#include "os/fs/aio.h"
#define SPDK_PREFIX "spdk:"
std::mutex lock;
std::condition_variable cond;
- list<FS::aio_t> pending_aios; ///< not yet submitted
- list<FS::aio_t> running_aios; ///< submitting or submitted
+ std::list<aio_t> pending_aios; ///< not yet submitted
+ std::list<aio_t> running_aios; ///< submitting or submitted
std::atomic_int num_pending = {0};
std::atomic_int num_running = {0};
std::atomic_int num_reading = {0};
CephContext* cct;
private:
std::mutex ioc_reap_lock;
- vector<IOContext*> ioc_reap_queue;
+ std::vector<IOContext*> ioc_reap_queue;
std::atomic_int ioc_reap_count = {0};
protected:
typedef void (*aio_callback_t)(void *handle, void *aio);
static BlockDevice *create(
- CephContext* cct, const string& path, aio_callback_t cb, void *cbpriv);
+ CephContext* cct, const std::string& path, aio_callback_t cb, void *cbpriv);
virtual bool supported_bdev_label() { return true; }
virtual bool is_rotational() { return rotational; }
virtual uint64_t get_size() const = 0;
virtual uint64_t get_block_size() const = 0;
- virtual int collect_metadata(string prefix, map<string,string> *pm) const = 0;
+ virtual int collect_metadata(std::string prefix, std::map<std::string,std::string> *pm) const = 0;
virtual int read(
uint64_t off,
// for managing buffered readers/writers
virtual int invalidate_cache(uint64_t off, uint64_t len) = 0;
- virtual int open(const string& path) = 0;
+ virtual int open(const std::string& path) = 0;
virtual void close() = 0;
};
}
// drop lock while we wait for io
- list<FS::aio_t> completed_ios;
+ list<aio_t> completed_ios;
_claim_completed_aios(log_writer, &completed_ios);
l.unlock();
wait_for_aio(log_writer);
// we need to retire old completed aios so they don't stick around in
// memory indefinitely (along with their bufferlist refs).
-void BlueFS::_claim_completed_aios(FileWriter *h, list<FS::aio_t> *ls)
+void BlueFS::_claim_completed_aios(FileWriter *h, list<aio_t> *ls)
{
for (auto p : h->iocv) {
if (p) {
return r;
uint64_t old_dirty_seq = h->file->dirty_seq;
- list<FS::aio_t> completed_ios;
+ list<aio_t> completed_ios;
_claim_completed_aios(h, &completed_ios);
lock.unlock();
wait_for_aio(h);
int _flush(FileWriter *h, bool force);
int _fsync(FileWriter *h, std::unique_lock<std::mutex>& l);
- void _claim_completed_aios(FileWriter *h, list<FS::aio_t> *ls);
+ void _claim_completed_aios(FileWriter *h, list<aio_t> *ls);
void wait_for_aio(FileWriter *h); // safe to call without a lock
int _flush_and_sync_log(std::unique_lock<std::mutex>& l,
while (!aio_stop) {
dout(40) << __func__ << " polling" << dendl;
int max = 16;
- FS::aio_t *aio[max];
+ aio_t *aio[max];
int r = aio_queue.get_next_completed(cct->_conf->bdev_aio_poll_ms,
aio, max);
if (r < 0) {
}
}
-void KernelDevice::debug_aio_link(FS::aio_t& aio)
+void KernelDevice::debug_aio_link(aio_t& aio)
{
if (debug_queue.empty()) {
debug_oldest = &aio;
debug_queue.push_back(aio);
}
-void KernelDevice::debug_aio_unlink(FS::aio_t& aio)
+void KernelDevice::debug_aio_unlink(aio_t& aio)
{
if (aio.queue_item.is_linked()) {
debug_queue.erase(debug_queue.iterator_to(aio));
// move these aside, and get our end iterator position now, as the
// aios might complete as soon as they are submitted and queue more
// wal aio's.
- list<FS::aio_t>::iterator e = ioc->running_aios.begin();
+ list<aio_t>::iterator e = ioc->running_aios.begin();
ioc->running_aios.splice(e, ioc->pending_aios);
- list<FS::aio_t>::iterator p = ioc->running_aios.begin();
+ list<aio_t>::iterator p = ioc->running_aios.begin();
int pending = ioc->num_pending.load();
ioc->num_running += pending;
bool done = false;
while (!done) {
- FS::aio_t& aio = *p;
+ aio_t& aio = *p;
aio.priv = static_cast<void*>(ioc);
dout(20) << __func__ << " aio " << &aio << " fd " << aio.fd
<< " 0x" << std::hex << aio.offset << "~" << aio.length
// be careful: as soon as we submit aio we race with completion.
// since we are holding a ref take care not to dereference txc at
// all after that point.
- list<FS::aio_t>::iterator cur = p;
+ list<aio_t>::iterator cur = p;
++p;
done = (p == e);
#ifdef HAVE_LIBAIO
if (aio && dio && !buffered) {
- ioc->pending_aios.push_back(FS::aio_t(ioc, fd_direct));
+ ioc->pending_aios.push_back(aio_t(ioc, fd_direct));
++ioc->num_pending;
- FS::aio_t& aio = ioc->pending_aios.back();
+ aio_t& aio = ioc->pending_aios.back();
if (cct->_conf->bdev_inject_crash &&
rand() % cct->_conf->bdev_inject_crash == 0) {
derr << __func__ << " bdev_inject_crash: dropping io 0x" << std::hex
#ifdef HAVE_LIBAIO
if (aio && dio) {
_aio_log_start(ioc, off, len);
- ioc->pending_aios.push_back(FS::aio_t(ioc, fd_direct));
+ ioc->pending_aios.push_back(aio_t(ioc, fd_direct));
++ioc->num_pending;
- FS::aio_t& aio = ioc->pending_aios.back();
+ aio_t& aio = ioc->pending_aios.back();
aio.pread(off, len);
for (unsigned i=0; i<aio.iov.size(); ++i) {
dout(30) << "aio " << i << " " << aio.iov[i].iov_base
#include <atomic>
#include "os/fs/FS.h"
+#include "os/fs/aio.h"
#include "include/interval_set.h"
#include "BlockDevice.h"
int fd_direct, fd_buffered;
uint64_t size;
uint64_t block_size;
- string path;
+ std::string path;
FS *fs;
bool aio, dio;
std::atomic<bool> io_since_flush = {false};
std::mutex flush_mutex;
- FS::aio_queue_t aio_queue;
+ aio_queue_t aio_queue;
aio_callback_t aio_callback;
void *aio_callback_priv;
bool aio_stop;
int direct_read_unaligned(uint64_t off, uint64_t len, char *buf);
// stalled aio debugging
- FS::aio_list_t debug_queue;
+ aio_list_t debug_queue;
std::mutex debug_queue_lock;
- FS::aio_t *debug_oldest = nullptr;
+ aio_t *debug_oldest = nullptr;
utime_t debug_stall_since;
- void debug_aio_link(FS::aio_t& aio);
- void debug_aio_unlink(FS::aio_t& aio);
+ void debug_aio_link(aio_t& aio);
+ void debug_aio_unlink(aio_t& aio);
public:
KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv);
return block_size;
}
- int collect_metadata(string prefix, map<string,string> *pm) const override;
+ int collect_metadata(std::string prefix, map<std::string,std::string> *pm) const override;
int read(uint64_t off, uint64_t len, bufferlist *pbl,
IOContext *ioc,
// for managing buffered readers/writers
int invalidate_cache(uint64_t off, uint64_t len) override;
- int open(const string& path) override;
+ int open(const std::string& path) override;
void close() override;
};
// ---------------
-#if defined(HAVE_LIBAIO)
-int FS::aio_queue_t::submit(aio_t &aio, int *retries)
-{
- // 2^16 * 125us = ~8 seconds, so max sleep is ~16 seconds
- int attempts = 16;
- int delay = 125;
- iocb *piocb = &aio.iocb;
- while (true) {
- int r = io_submit(ctx, 1, &piocb);
- if (r < 0) {
- if (r == -EAGAIN && attempts-- > 0) {
- usleep(delay);
- delay *= 2;
- (*retries)++;
- continue;
- }
- return r;
- }
- assert(r == 1);
- break;
- }
- return 0;
-}
-
-int FS::aio_queue_t::get_next_completed(int timeout_ms, aio_t **paio, int max)
-{
- io_event event[max];
- struct timespec t = {
- timeout_ms / 1000,
- (timeout_ms % 1000) * 1000 * 1000
- };
-
- int r = 0;
- do {
- r = io_getevents(ctx, 1, max, event, &t);
- } while (r == -EINTR);
-
- for (int i=0; i<r; ++i) {
- paio[i] = (aio_t *)event[i].obj;
- paio[i]->rval = event[i].res;
- }
- return r;
-}
-#endif
#include <errno.h>
#include <time.h>
-#include "acconfig.h"
-#ifdef HAVE_LIBAIO
-# include <libaio.h>
-#endif
-
#include <string>
#include "include/types.h"
#include "common/Mutex.h"
#include "common/Cond.h"
-#include <boost/intrusive/list.hpp>
-#include <boost/container/small_vector.hpp>
class FS {
public:
virtual int zero(int fd, uint64_t offset, uint64_t length);
// -- aio --
-#if defined(HAVE_LIBAIO)
- struct aio_t {
- struct iocb iocb; // must be first element; see shenanigans in aio_queue_t
- void *priv;
- int fd;
- boost::container::small_vector<iovec,4> iov;
- uint64_t offset, length;
- int rval;
- bufferlist bl; ///< write payload (so that it remains stable for duration)
-
- boost::intrusive::list_member_hook<> queue_item;
-
- aio_t(void *p, int f) : priv(p), fd(f), offset(0), length(0), rval(-1000) {
- }
-
- void pwritev(uint64_t _offset, uint64_t len) {
- offset = _offset;
- length = len;
- io_prep_pwritev(&iocb, fd, &iov[0], iov.size(), offset);
- }
- void pread(uint64_t _offset, uint64_t len) {
- offset = _offset;
- length = len;
- bufferptr p = buffer::create_page_aligned(length);
- io_prep_pread(&iocb, fd, p.c_str(), length, offset);
- bl.append(std::move(p));
- }
-
- int get_return_value() {
- return rval;
- }
- };
-
- typedef boost::intrusive::list<
- aio_t,
- boost::intrusive::member_hook<
- aio_t,
- boost::intrusive::list_member_hook<>,
- &aio_t::queue_item> > aio_list_t;
-
- struct aio_queue_t {
- int max_iodepth;
- io_context_t ctx;
-
-
- explicit aio_queue_t(unsigned max_iodepth)
- : max_iodepth(max_iodepth),
- ctx(0) {
- }
- ~aio_queue_t() {
- assert(ctx == 0);
- }
-
- int init() {
- assert(ctx == 0);
- return io_setup(max_iodepth, &ctx);
- }
- void shutdown() {
- if (ctx) {
- int r = io_destroy(ctx);
- assert(r == 0);
- ctx = 0;
- }
- }
-
- int submit(aio_t &aio, int *retries);
- int get_next_completed(int timeout_ms, aio_t **paio, int max);
- };
-#endif
};
#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "aio.h"
+
+#if defined(HAVE_LIBAIO)
+
+int aio_queue_t::submit(aio_t &aio, int *retries)
+{
+ // 2^16 * 125us = ~8 seconds, so max sleep is ~16 seconds
+ int attempts = 16;
+ int delay = 125;
+ iocb *piocb = &aio.iocb;
+ while (true) {
+ int r = io_submit(ctx, 1, &piocb);
+ if (r < 0) {
+ if (r == -EAGAIN && attempts-- > 0) {
+ usleep(delay);
+ delay *= 2;
+ (*retries)++;
+ continue;
+ }
+ return r;
+ }
+ assert(r == 1);
+ break;
+ }
+ return 0;
+}
+
+int aio_queue_t::get_next_completed(int timeout_ms, aio_t **paio, int max)
+{
+ io_event event[max];
+ struct timespec t = {
+ timeout_ms / 1000,
+ (timeout_ms % 1000) * 1000 * 1000
+ };
+
+ int r = 0;
+ do {
+ r = io_getevents(ctx, 1, max, event, &t);
+ } while (r == -EINTR);
+
+ for (int i=0; i<r; ++i) {
+ paio[i] = (aio_t *)event[i].obj;
+ paio[i]->rval = event[i].res;
+ }
+ return r;
+}
+
+#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "acconfig.h"
+#ifdef HAVE_LIBAIO
+# include <libaio.h>
+
+#include <boost/intrusive/list.hpp>
+#include <boost/container/small_vector.hpp>
+
+#include "include/buffer.h"
+
+struct aio_t {
+ struct iocb iocb; // must be first element; see shenanigans in aio_queue_t
+ void *priv;
+ int fd;
+ boost::container::small_vector<iovec,4> iov;
+ uint64_t offset, length;
+ int rval;
+ bufferlist bl; ///< write payload (so that it remains stable for duration)
+
+ boost::intrusive::list_member_hook<> queue_item;
+
+ aio_t(void *p, int f) : priv(p), fd(f), offset(0), length(0), rval(-1000) {
+ }
+
+ void pwritev(uint64_t _offset, uint64_t len) {
+ offset = _offset;
+ length = len;
+ io_prep_pwritev(&iocb, fd, &iov[0], iov.size(), offset);
+ }
+ void pread(uint64_t _offset, uint64_t len) {
+ offset = _offset;
+ length = len;
+ bufferptr p = buffer::create_page_aligned(length);
+ io_prep_pread(&iocb, fd, p.c_str(), length, offset);
+ bl.append(std::move(p));
+ }
+
+ int get_return_value() {
+ return rval;
+ }
+};
+
+typedef boost::intrusive::list<
+ aio_t,
+ boost::intrusive::member_hook<
+ aio_t,
+ boost::intrusive::list_member_hook<>,
+ &aio_t::queue_item> > aio_list_t;
+
+struct aio_queue_t {
+ int max_iodepth;
+ io_context_t ctx;
+
+ explicit aio_queue_t(unsigned max_iodepth)
+ : max_iodepth(max_iodepth),
+ ctx(0) {
+ }
+ ~aio_queue_t() {
+ assert(ctx == 0);
+ }
+
+ int init() {
+ assert(ctx == 0);
+ return io_setup(max_iodepth, &ctx);
+ }
+ void shutdown() {
+ if (ctx) {
+ int r = io_destroy(ctx);
+ assert(r == 0);
+ ctx = 0;
+ }
+ }
+
+ int submit(aio_t &aio, int *retries);
+ int get_next_completed(int timeout_ms, aio_t **paio, int max);
+};
+
+#endif