//
funcs.push_back(std::move(func));
int r = 0;
- r = dpdk::eal::init(cct);
+ r = eal.start();
if (r < 0) {
- lderr(cct) << __func__ << " init dpdk rte failed, r=" << r << dendl;
+ lderr(cct) << __func__ << " start dpdk rte failed, r=" << r << dendl;
ceph_abort();
}
- // if dpdk::eal::init already called by NVMEDevice, we will select 1..n
+ // if eal.start already called by NVMEDevice, we will select 1..n
// cores
unsigned nr_worker = funcs.size();
ceph_assert(rte_lcore_count() >= nr_worker);
}
}
void *adapted_func = static_cast<void*>(&funcs.back());
- dpdk::eal::execute_on_master([adapted_func, core_id, this]() {
+ eal.execute_on_master([adapted_func, core_id, this]() {
int r = rte_eal_remote_launch(dpdk_thread_adaptor, adapted_func, core_id);
if (r < 0) {
lderr(cct) << __func__ << " remote launch failed, r=" << r << dendl;
void DPDKStack::join_worker(unsigned i)
{
- dpdk::eal::execute_on_master([&]() {
+ eal.execute_on_master([&]() {
rte_eal_wait_lcore(i+1);
});
+ if (i+1 == get_num_worker())
+ eal.stop();
}
#include "const.h"
#include "IP.h"
#include "Packet.h"
+#include "dpdk_rte.h"
class interface;
friend class DPDKServerSocketImpl<tcp4>;
};
+using namespace dpdk;
class DPDKStack : public NetworkStack {
std::vector<std::function<void()> > funcs;
}
public:
- explicit DPDKStack(CephContext *cct): NetworkStack(cct) {
+ explicit DPDKStack(CephContext *cct): NetworkStack(cct), eal(cct) {
funcs.reserve(cct->_conf->ms_async_op_threads);
}
virtual bool support_local_listen_table() const override { return true; }
virtual void spawn_worker(std::function<void ()> &&func) override;
virtual void join_worker(unsigned i) override;
+ private:
+ dpdk::eal eal;
};
#endif
return v;
}
- bool eal::initialized = false;
- std::thread eal::t;
- std::mutex eal::lock;
- std::condition_variable eal::cond;
- std::list<std::function<void()>> eal::funcs;
-
static int bitcount(unsigned long long n)
{
return std::bitset<CHAR_BIT * sizeof(n)>{n}.count();
return count;
}
- int eal::init(CephContext *c)
+ bool eal::rte_initialized = false;
+
+ int eal::start()
{
if (initialized) {
return 1;
}
bool done = false;
- auto coremask = c->_conf.get_val<std::string>("ms_dpdk_coremask");
+ auto coremask = cct->_conf.get_val<std::string>("ms_dpdk_coremask");
int coremaskbit = coremask_bitcount(coremask.c_str());
if (coremaskbit <= 0
- || static_cast<uint64_t>(coremaskbit) < c->_conf->ms_async_op_threads)
+ || static_cast<uint64_t>(coremaskbit) < cct->_conf->ms_async_op_threads)
return -EINVAL;
t = std::thread([&]() {
// TODO: Inherit these from the app parameters - "opts"
std::vector<std::vector<char>> args {
string2vector("ceph"),
- string2vector("-c"), string2vector(c->_conf.get_val<std::string>("ms_dpdk_coremask")),
- string2vector("-n"), string2vector(c->_conf->ms_dpdk_memory_channel),
+ string2vector("-c"), string2vector(cct->_conf.get_val<std::string>("ms_dpdk_coremask")),
+ string2vector("-n"), string2vector(cct->_conf->ms_dpdk_memory_channel),
};
std::optional<std::string> hugepages_path;
- if (!c->_conf->ms_dpdk_hugepages.empty()) {
- hugepages_path.emplace(c->_conf->ms_dpdk_hugepages);
+ if (!cct->_conf->ms_dpdk_hugepages.empty()) {
+ hugepages_path.emplace(cct->_conf->ms_dpdk_hugepages);
}
// If "hugepages" is not provided and DPDK PMD drivers mode is requested -
args.push_back(string2vector("-m"));
args.push_back(string2vector(size_MB_str.str()));
- } else if (!c->_conf->ms_dpdk_pmd.empty()) {
+ } else if (!cct->_conf->ms_dpdk_pmd.empty()) {
args.push_back(string2vector("--no-huge"));
}
std::string rte_file_prefix;
rte_file_prefix = "rte_";
- rte_file_prefix += c->_conf->name.to_str();
+ rte_file_prefix += cct->_conf->name.to_str();
args.push_back(string2vector("--file-prefix"));
args.push_back(string2vector(rte_file_prefix));
for (auto&& a: args) {
cargs.push_back(a.data());
}
- /* initialise the EAL for all */
- int ret = rte_eal_init(cargs.size(), cargs.data());
- if (ret < 0)
- return ret;
+ if (!rte_initialized) {
+ /* initialise the EAL for all */
+ int ret = rte_eal_init(cargs.size(), cargs.data());
+ if (ret < 0)
+ return;
+ rte_initialized = true;
+ }
std::unique_lock<std::mutex> l(lock);
initialized = true;
done = true;
cond.notify_all();
- while (true) {
+ while (!stopped) {
+ cond.wait(l, [this] { return !funcs.empty() || stopped; });
if (!funcs.empty()) {
auto f = std::move(funcs.front());
funcs.pop_front();
f();
cond.notify_all();
- } else {
- cond.wait(l);
}
}
});
- t.detach();
std::unique_lock<std::mutex> l(lock);
while (!done)
cond.wait(l);
return memsize;
}
+ void eal::stop()
+ {
+ assert(initialized);
+ assert(!stopped);
+ stopped = true;
+ cond.notify_all();
+ t.join();
+ }
+
} // namespace dpdk
class eal {
public:
using cpuset = std::bitset<RTE_MAX_LCORE>;
-
- static std::mutex lock;
- static std::condition_variable cond;
- static std::list<std::function<void()>> funcs;
- static int init(CephContext *c);
- static void execute_on_master(std::function<void()> &&f) {
+ explicit eal(CephContext *cct) : cct(cct) {}
+ int start();
+ void stop();
+ void execute_on_master(std::function<void()> &&f) {
bool done = false;
std::unique_lock<std::mutex> l(lock);
funcs.emplace_back([&]() { f(); done = true; });
*
* @return
*/
- static size_t mem_size(int num_cpus);
- static bool initialized;
- static std::thread t;
+ size_t mem_size(int num_cpus);
+ static bool rte_initialized;
+ private:
+ CephContext *cct;
+ bool initialized = false;
+ bool stopped = false;
+ std::thread t;
+ std::mutex lock;
+ std::condition_variable cond;
+ std::list<std::function<void()>> funcs;
};
} // namespace dpdk