std::shared_ptr<NetworkStack> NetworkStack::create(CephContext *c,
const std::string &t)
{
+ std::shared_ptr<NetworkStack> stack = nullptr;
+
if (t == "posix")
- return std::make_shared<PosixNetworkStack>(c, t);
+ stack.reset(new PosixNetworkStack(c, t));
#ifdef HAVE_RDMA
else if (t == "rdma")
- return std::make_shared<RDMAStack>(c, t);
+ stack.reset(new RDMAStack(c, t));
#endif
#ifdef HAVE_DPDK
else if (t == "dpdk")
- return std::make_shared<DPDKStack>(c, t);
+ stack.reset(new DPDKStack(c, t));
#endif
- lderr(c) << __func__ << " ms_async_transport_type " << t <<
+ if (stack == nullptr) {
+ lderr(c) << __func__ << " ms_async_transport_type " << t <<
" is not supported! " << dendl;
- ceph_abort();
- return nullptr;
-}
-
-Worker* NetworkStack::create_worker(CephContext *c, const std::string &type, unsigned worker_id)
-{
- if (type == "posix")
- return new PosixWorker(c, worker_id);
-#ifdef HAVE_RDMA
- else if (type == "rdma")
- return new RDMAWorker(c, worker_id);
-#endif
-#ifdef HAVE_DPDK
- else if (type == "dpdk")
- return new DPDKWorker(c, worker_id);
-#endif
+ ceph_abort();
+ return nullptr;
+ }
+
+ const int InitEventNumber = 5000;
+ for (unsigned worker_id = 0; worker_id < stack->num_workers; ++worker_id) {
+ Worker *w = stack->create_worker(c, worker_id);
+ int ret = w->center.init(InitEventNumber, worker_id, t);
+ if (ret)
+ throw std::system_error(-ret, std::generic_category());
+ stack->workers.push_back(w);
+ }
- lderr(c) << __func__ << " ms_async_transport_type " << type <<
- " is not supported! " << dendl;
- ceph_abort();
- return nullptr;
+ return stack;
}
NetworkStack::NetworkStack(CephContext *c, const std:: string &t): type(t), started(false), cct(c)
{
ceph_assert(cct->_conf->ms_async_op_threads > 0);
- const int InitEventNumber = 5000;
num_workers = cct->_conf->ms_async_op_threads;
if (num_workers >= EventCenter::MAX_EVENTCENTER) {
ldout(cct, 0) << __func__ << " max thread limit is "
<< dendl;
num_workers = EventCenter::MAX_EVENTCENTER;
}
-
- for (unsigned worker_id = 0; worker_id < num_workers; ++worker_id) {
- Worker *w = create_worker(cct, type, worker_id);
- int ret = w->center.init(InitEventNumber, worker_id, type);
- if (ret)
- throw std::system_error(-ret, std::generic_category());
- workers.push_back(w);
- }
}
void NetworkStack::start()
std::function<void ()> add_thread(unsigned i);
+ virtual Worker* create_worker(CephContext *c, unsigned i) = 0;
+
protected:
CephContext *cct;
std::vector<Worker*> workers;
static std::shared_ptr<NetworkStack> create(
CephContext *c, const std::string &type);
- static Worker* create_worker(
- CephContext *c, const std::string &t, unsigned i);
// backend need to override this method if backend doesn't support shared
// listen table.
// For example, posix backend has in kernel global listen table. If one