#include <fcntl.h>
#include <unistd.h>
#include <map>
+#include <thread>
#ifdef HAVE_SSE
#include <xmmintrin.h>
#endif
static void io_complete(void *t, const struct spdk_nvme_cpl *completion);
-static const char *ealargs[] = {
- "ceph-osd",
- "-c 0x1", /* This must be the second parameter. It is overwritten by index in main(). */
- "-n 4",
-};
-
+int dpdk_thread_adaptor(void *f)
+{
+ (*static_cast<std::function<void ()>*>(f))();
+ return 0;
+}
class SharedDriverData {
+ unsigned id;
std::string sn;
std::string name;
spdk_nvme_ctrlr *ctrlr;
spdk_nvme_ns *ns;
+ std::function<void ()> run_func;
uint64_t block_size = 0;
uint64_t size = 0;
std::vector<NVMEDevice*> registered_devices;
- struct AioCompletionThread : public Thread {
- SharedDriverData *data;
- AioCompletionThread(SharedDriverData *d) : data(d) {}
- void *entry() {
- data->_aio_thread();
- return NULL;
- }
- } aio_thread;
friend class AioCompletionThread;
bool aio_stop = false;
void _aio_thread();
void _aio_start() {
- aio_thread.create("nvme_aio_thread");
+ int r = rte_eal_remote_launch(dpdk_thread_adaptor, static_cast<void*>(&run_func), id);
+ assert(r == 0);
}
void _aio_stop() {
- assert(aio_thread.is_started());
{
Mutex::Locker l(queue_lock);
aio_stop = true;
queue_cond.Signal();
}
- aio_thread.join();
+ int r = rte_eal_wait_lcore(id);
+ assert(r == 0);
aio_stop = false;
}
std::atomic_bool queue_empty;
std::atomic_int inflight_ops;
PerfCounters *logger = nullptr;
- SharedDriverData(const std::string &sn_tag, const std::string &n, spdk_nvme_ctrlr *c, spdk_nvme_ns *ns)
- : sn(sn_tag),
+ SharedDriverData(unsigned i, const std::string &sn_tag, const std::string &n,
+ spdk_nvme_ctrlr *c, spdk_nvme_ns *ns)
+ : id(i),
+ sn(sn_tag),
name(n),
ctrlr(c),
ns(ns),
- aio_thread(this),
+ run_func([this]() { _aio_thread(); }),
queue_empty(false),
queue_lock("NVMEDevice::queue_lock"),
flush_lock("NVMEDevice::flush_lock"),
#define dout_prefix *_dout << "bdev "
class NVMEManager {
- Mutex lock;
- bool init = false;
- std::vector<SharedDriverData*> shared_driver_datas;
-
public:
struct ProbeContext {
string sn_tag;
NVMEManager *manager;
SharedDriverData *driver;
+ bool done;
};
+ private:
+ Mutex lock;
+ bool init = false;
+ std::vector<SharedDriverData*> shared_driver_datas;
+ std::thread dpdk_thread;
+ std::mutex probe_queue_lock;
+ std::condition_variable probe_queue_cond;
+ std::list<ProbeContext*> probe_queue;
+
+ public:
NVMEManager()
: lock("NVMEDevice::NVMEManager::lock") {}
int try_get(const string &sn_tag, SharedDriverData **driver);
dout(1) << __func__ << " successfully attach nvme device at" << name
<< " " << spdk_pci_device_get_bus(pci_dev) << ":" << spdk_pci_device_get_dev(pci_dev) << ":" << spdk_pci_device_get_func(pci_dev) << dendl;
- shared_driver_datas.push_back(new SharedDriverData(sn_tag, name, c, ns));
+ // only support one device per osd now!
+ assert(shared_driver_datas.empty());
+ // index 0 is occured by master thread
+ shared_driver_datas.push_back(new SharedDriverData(shared_driver_datas.size()+1, sn_tag, name, c, ns));
*driver = shared_driver_datas.back();
}
};
{
Mutex::Locker l(lock);
int r = 0;
- if (!init) {
- r = rte_eal_init(sizeof(ealargs) / sizeof(ealargs[0]), (char **)(void *)(uintptr_t)ealargs);
- if (r < 0) {
- derr << __func__ << " failed to do rte_eal_init" << dendl;
- return r;
- }
-
- request_mempool = rte_mempool_create("nvme_request", 512,
- spdk_nvme_request_size(), 128, 0,
- NULL, NULL, NULL, NULL,
- SOCKET_ID_ANY, 0);
- if (request_mempool == NULL) {
- derr << __func__ << " failed to create memory pool for nvme requests" << dendl;
- return -ENOMEM;
- }
-
- task_pool = rte_mempool_create(
- "task_pool", 512, sizeof(Task),
- 64, 0, NULL, NULL, NULL, NULL,
- SOCKET_ID_ANY, 0);
- if (task_pool == NULL) {
- derr << __func__ << " failed to create memory pool for nvme requests" << dendl;
- return -ENOMEM;
- }
-
- pci_system_init();
- spdk_nvme_retry_count = g_conf->bdev_nvme_retry_count;
- if (spdk_nvme_retry_count < 0)
- spdk_nvme_retry_count = SPDK_NVME_DEFAULT_RETRY_COUNT;
-
- init = true;
- }
-
if (sn_tag.empty()) {
r = -ENOENT;
derr << __func__ << " empty serial number: " << cpp_strerror(r) << dendl;
}
}
- ProbeContext ctx = {sn_tag, this, nullptr};
- r = spdk_nvme_probe(&ctx, probe_cb, attach_cb);
- if (r < 0) {
- assert(!ctx.driver);
- derr << __func__ << " device probe nvme failed" << dendl;
- return r;
+ if (!init) {
+ init = true;
+ dpdk_thread = std::thread(
+ [this]() {
+ static const char *ealargs[] = {
+ "ceph-osd",
+ "-c 0x3", /* This must be the second parameter. It is overwritten by index in main(). */
+ "-n 4",
+ };
+
+ int r = rte_eal_init(sizeof(ealargs) / sizeof(ealargs[0]), (char **)(void *)(uintptr_t)ealargs);
+ if (r < 0) {
+ derr << __func__ << " failed to do rte_eal_init" << dendl;
+ assert(0);
+ }
+
+ request_mempool = rte_mempool_create("nvme_request", 512,
+ spdk_nvme_request_size(), 128, 0,
+ NULL, NULL, NULL, NULL,
+ SOCKET_ID_ANY, 0);
+ if (request_mempool == NULL) {
+ derr << __func__ << " failed to create memory pool for nvme requests" << dendl;
+ assert(0);
+ }
+
+ task_pool = rte_mempool_create(
+ "task_pool", 512, sizeof(Task),
+ 64, 0, NULL, NULL, NULL, NULL,
+ SOCKET_ID_ANY, 0);
+ if (task_pool == NULL) {
+ derr << __func__ << " failed to create memory pool for nvme requests" << dendl;
+ assert(0);
+ }
+
+ pci_system_init();
+ spdk_nvme_retry_count = g_conf->bdev_nvme_retry_count;
+ if (spdk_nvme_retry_count < 0)
+ spdk_nvme_retry_count = SPDK_NVME_DEFAULT_RETRY_COUNT;
+
+ std::unique_lock<std::mutex> l(probe_queue_lock);
+ while (true) {
+ if (!probe_queue.empty()) {
+ ProbeContext* ctxt = probe_queue.front();
+ probe_queue.pop_front();
+ r = spdk_nvme_probe(ctxt, probe_cb, attach_cb);
+ if (r < 0) {
+ assert(!ctxt->driver);
+ derr << __func__ << " device probe nvme failed" << dendl;
+ }
+ ctxt->done = true;
+ probe_queue_cond.notify_all();
+ } else {
+ probe_queue_cond.wait(l);
+ }
+ }
+ }
+ );
+ dpdk_thread.detach();
}
+ ProbeContext ctx = {sn_tag, this, nullptr, false};
+ {
+ std::unique_lock<std::mutex> l(probe_queue_lock);
+ probe_queue.push_back(&ctx);
+ while (!ctx.done)
+ probe_queue_cond.wait(l);
+ }
+ if (!ctx.driver)
+ return -1;
*driver = ctx.driver;
return 0;