From: Kefu Chai Date: Sun, 14 Feb 2021 07:29:07 +0000 (+0800) Subject: msg/async/dpdk: use optional<> instead of Tub<> X-Git-Tag: v17.1.0~2896^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=3dbcaf5606efb2fc0ec078e8326c5fa39bdfa3fa;p=ceph.git msg/async/dpdk: use optional<> instead of Tub<> Signed-off-by: Kefu Chai --- diff --git a/src/msg/async/dpdk/ARP.cc b/src/msg/async/dpdk/ARP.cc index dedc9e3c7aad..f73eed40c28d 100644 --- a/src/msg/async/dpdk/ARP.cc +++ b/src/msg/async/dpdk/ARP.cc @@ -48,9 +48,9 @@ arp::arp(interface* netif): ) {} -Tub arp::get_packet() +std::optional arp::get_packet() { - Tub p; + std::optional p; if (!_packetq.empty()) { p = std::move(_packetq.front()); _packetq.pop_front(); diff --git a/src/msg/async/dpdk/ARP.h b/src/msg/async/dpdk/ARP.h index 5456956480a5..e2f6dfe98fe4 100644 --- a/src/msg/async/dpdk/ARP.h +++ b/src/msg/async/dpdk/ARP.h @@ -85,7 +85,7 @@ class arp { ethernet_address l2self() { return _netif->hw_address(); } int process_packet(Packet p, ethernet_address from); bool forward(forward_hash& out_hash_data, Packet& p, size_t off); - Tub get_packet(); + std::optional get_packet(); template friend class arp_for; }; diff --git a/src/msg/async/dpdk/DPDK.cc b/src/msg/async/dpdk/DPDK.cc index ff4967888108..4c5df6e578ce 100644 --- a/src/msg/async/dpdk/DPDK.cc +++ b/src/msg/async/dpdk/DPDK.cc @@ -467,7 +467,7 @@ void DPDKQueuePair::configure_proxies(const std::map& cpu_weigh return; } register_packet_provider([this] { - Tub p; + std::optional p; if (!_proxy_packetq.empty()) { p = std::move(_proxy_packetq.front()); _proxy_packetq.pop_front(); @@ -716,7 +716,7 @@ bool DPDKQueuePair::poll_tx() { return false; } -inline Tub DPDKQueuePair::from_mbuf_lro(rte_mbuf* m) +inline std::optional DPDKQueuePair::from_mbuf_lro(rte_mbuf* m) { _frags.clear(); _bufs.clear(); @@ -736,7 +736,7 @@ inline Tub DPDKQueuePair::from_mbuf_lro(rte_mbuf* m) _frags.begin(), _frags.end(), make_deleter(std::move(del))); } -inline Tub DPDKQueuePair::from_mbuf(rte_mbuf* m) +inline std::optional DPDKQueuePair::from_mbuf(rte_mbuf* m) { _rx_free_pkts.push_back(m); _num_rx_free_segs += m->nb_segs; @@ -824,7 +824,7 @@ void DPDKQueuePair::process_packets( struct rte_mbuf *m = bufs[i]; offload_info oi; - Tub p = from_mbuf(m); + std::optional p = from_mbuf(m); // Drop the packet if translation above has failed if (!p) { diff --git a/src/msg/async/dpdk/DPDK.h b/src/msg/async/dpdk/DPDK.h index 78a1a0769326..2bfbd3c75a48 100644 --- a/src/msg/async/dpdk/DPDK.h +++ b/src/msg/async/dpdk/DPDK.h @@ -23,16 +23,17 @@ #ifndef CEPH_DPDK_DEV_H #define CEPH_DPDK_DEV_H -#include #include +#include +#include #include #include #include +#include #include #include #include "include/page.h" -#include "common/Tub.h" #include "common/perf_counters.h" #include "msg/async/Event.h" #include "const.h" @@ -85,8 +86,9 @@ enum { class DPDKDevice; class DPDKWorker; + class DPDKQueuePair { - using packet_provider_type = std::function ()>; + using packet_provider_type = std::function ()>; public: void configure_proxies(const std::map& cpu_weights); // build REdirection TAble for cpu_weights map: target cpu -> weight @@ -410,7 +412,7 @@ class DPDKQueuePair { private: struct rte_mbuf _mbuf; MARKER private_start; - Tub _p; + std::optional _p; phys_addr_t _buf_physaddr; uint16_t _data_off; // TRUE if underlying mbuf has been used in the zero-copy flow @@ -530,7 +532,7 @@ class DPDKQueuePair { } void rx_start() { - _rx_poller.construct(this); + _rx_poller.emplace(this); } uint32_t send(circular_buffer& pb) { @@ -642,7 +644,7 @@ class DPDKQueuePair { * @return a "optional" object representing the newly received data if in an * "engaged" state or an error if in a "disengaged" state. */ - Tub from_mbuf(rte_mbuf* m); + std::optional from_mbuf(rte_mbuf* m); /** * Transform an LRO rte_mbuf cluster into the "packet" object. @@ -651,12 +653,12 @@ class DPDKQueuePair { * @return a "optional" object representing the newly received LRO packet if * in an "engaged" state or an error if in a "disengaged" state. */ - Tub from_mbuf_lro(rte_mbuf* m); + std::optional from_mbuf_lro(rte_mbuf* m); private: CephContext *cct; std::vector _pkt_providers; - Tub> _sw_reta; + std::optional> _sw_reta; circular_buffer _proxy_packetq; stream _rx_stream; circular_buffer _tx_packetq; @@ -717,7 +719,7 @@ class DPDKQueuePair { return qp->poll_rx_once(); } }; - Tub _rx_poller; + std::optional _rx_poller; class DPDKTXGCPoller : public EventCenter::Poller { DPDKQueuePair *qp; diff --git a/src/msg/async/dpdk/DPDKStack.h b/src/msg/async/dpdk/DPDKStack.h index 926adaffcbb7..64397e659f11 100644 --- a/src/msg/async/dpdk/DPDKStack.h +++ b/src/msg/async/dpdk/DPDKStack.h @@ -16,9 +16,9 @@ #define CEPH_MSG_DPDKSTACK_H #include +#include #include "common/ceph_context.h" -#include "common/Tub.h" #include "msg/async/Stack.h" #include "net.h" @@ -54,8 +54,8 @@ class NativeConnectedSocketImpl : public ConnectedSocketImpl { typename Protocol::connection _conn; uint32_t _cur_frag = 0; uint32_t _cur_off = 0; - Tub _buf; - Tub _cache_ptr; + std::optional _buf; + std::optional _cache_ptr; public: explicit NativeConnectedSocketImpl(typename Protocol::connection conn) @@ -72,10 +72,10 @@ class NativeConnectedSocketImpl : public ConnectedSocketImpl { size_t off = 0; while (left > 0) { if (!_cache_ptr) { - _cache_ptr.construct(); + _cache_ptr.emplace(); r = zero_copy_read(*_cache_ptr); if (r <= 0) { - _cache_ptr.destroy(); + _cache_ptr.reset(); if (r == -EAGAIN) break; return r; @@ -85,7 +85,7 @@ class NativeConnectedSocketImpl : public ConnectedSocketImpl { _cache_ptr->copy_out(0, _cache_ptr->length(), buf+off); left -= _cache_ptr->length(); off += _cache_ptr->length(); - _cache_ptr.destroy(); + _cache_ptr.reset(); } else { _cache_ptr->copy_out(0, left, buf+off); _cache_ptr->set_offset(_cache_ptr->offset() + left); @@ -118,7 +118,7 @@ private: if (++_cur_frag == _buf->nr_frags()) { _cur_frag = 0; _cur_off = 0; - _buf.destroy(); + _buf.reset(); } else { _cur_off += f.size; } diff --git a/src/msg/async/dpdk/IP.cc b/src/msg/async/dpdk/IP.cc index fab534bb25b5..0bfb21b16821 100644 --- a/src/msg/async/dpdk/IP.cc +++ b/src/msg/async/dpdk/IP.cc @@ -333,7 +333,7 @@ void ipv4::send(ipv4_address to, ip_protocol_num proto_num, } } -Tub ipv4::get_packet() { +std::optional ipv4::get_packet() { // _packetq will be mostly empty here unless it hold remnants of previously // fragmented packet if (_packetq.empty()) { @@ -350,7 +350,7 @@ Tub ipv4::get_packet() { } } - Tub p; + std::optional p; if (!_packetq.empty()) { p = std::move(_packetq.front()); _packetq.pop_front(); diff --git a/src/msg/async/dpdk/IP.h b/src/msg/async/dpdk/IP.h index 1fc60658235f..e0e62f1228c1 100644 --- a/src/msg/async/dpdk/IP.h +++ b/src/msg/async/dpdk/IP.h @@ -60,7 +60,7 @@ struct ipv4_traits { ethernet_address e_dst; ip_protocol_num proto_num; }; - using packet_provider_type = std::function ()>; + using packet_provider_type = std::function ()>; static void tcp_pseudo_header_checksum(checksummer& csum, ipv4_address src, ipv4_address dst, uint16_t len) { csum.sum_many(src.ip, dst.ip, uint8_t(0), uint8_t(ip_protocol_num::tcp), len); } @@ -131,7 +131,7 @@ class icmp { explicit icmp(CephContext *c, inet_type& inet) : cct(c), _inet(inet), _queue_space(c, "DPDK::icmp::_queue_space", 212992) { _inet.register_packet_provider([this] { - Tub l4p; + std::optional l4p; if (!_packetq.empty()) { l4p = std::move(_packetq.front()); _packetq.pop_front(); @@ -213,7 +213,7 @@ class ipv4 { private: interface* _netif; std::vector _pkt_providers; - Tub frag_timefd; + std::optional frag_timefd; EventCallbackRef frag_handler; arp _global_arp; arp_for _arp; @@ -251,7 +251,7 @@ class ipv4 { private: int handle_received_packet(Packet p, ethernet_address from); bool forward(forward_hash& out_hash_data, Packet& p, size_t off); - Tub get_packet(); + std::optional get_packet(); bool in_my_netmask(ipv4_address a) const { return !((a.ip ^ _host_address.ip) & _netmask.ip); } @@ -262,11 +262,11 @@ class ipv4 { } void frag_arm(utime_t now) { auto tp = now + _frag_timeout; - frag_timefd.construct(center->create_time_event(tp.to_nsec() / 1000, frag_handler)); + frag_timefd = center->create_time_event(tp.to_nsec() / 1000, frag_handler); } void frag_arm() { auto now = ceph_clock_now(); - frag_timefd.construct(center->create_time_event(now.to_nsec() / 1000, frag_handler)); + frag_timefd = center->create_time_event(now.to_nsec() / 1000, frag_handler); } public: diff --git a/src/msg/async/dpdk/Packet.h b/src/msg/async/dpdk/Packet.h index f929da317860..2aa65f6e1edb 100644 --- a/src/msg/async/dpdk/Packet.h +++ b/src/msg/async/dpdk/Packet.h @@ -28,7 +28,6 @@ #include #include "include/types.h" -#include "common/Tub.h" #include "common/deleter.h" #include "msg/async/Event.h" @@ -49,7 +48,7 @@ struct offload_info { bool reassembled = false; uint16_t tso_seg_size = 0; // HW stripped VLAN header (CPU order) - Tub vlan_tci; + std::optional vlan_tci; }; // Zero-copy friendly packet class @@ -97,7 +96,7 @@ class Packet { uint16_t _nr_frags = 0; uint16_t _allocated_frags; offload_info _offload_info; - Tub rss_hash; + std::optional rss_hash; char data[internal_data_size]; // only frags[0] may use unsigned headroom = internal_data_size; // in data // FIXME: share data/frags space @@ -122,7 +121,7 @@ class Packet { n->_nr_frags = old->_nr_frags; n->headroom = old->headroom; n->_offload_info = old->_offload_info; - n->rss_hash.construct(old->rss_hash); + n->rss_hash = old->rss_hash; std::copy(old->frags, old->frags + old->_nr_frags, n->frags); old->copy_internal_fragment_to(n.get()); return n; @@ -268,11 +267,11 @@ public: _impl = impl::allocate_if_needed(std::move(_impl), extra); } } - Tub rss_hash() { + std::optional rss_hash() { return _impl->rss_hash; } void set_rss_hash(uint32_t hash) { - _impl->rss_hash.construct(hash); + _impl->rss_hash = hash; } private: void linearize(size_t at_frag, size_t desired_size); diff --git a/src/msg/async/dpdk/TCP.h b/src/msg/async/dpdk/TCP.h index a0104fb44f41..79fd6fbb1309 100644 --- a/src/msg/async/dpdk/TCP.h +++ b/src/msg/async/dpdk/TCP.h @@ -234,7 +234,7 @@ class tcp { public: C_handle_delayed_ack(tcb *t): tc(t) { } void do_request(uint64_t r) { - tc->_delayed_ack_fd.destroy(); + tc->_delayed_ack_fd.reset(); tc->_nr_full_seg_received = 0; tc->output(); } @@ -246,7 +246,7 @@ class tcp { public: C_handle_retransmit(tcb *t): tc(t) { } void do_request(uint64_t r) { - tc->retransmit_fd.destroy(); + tc->retransmit_fd.reset(); tc->retransmit(); } }; @@ -257,7 +257,7 @@ class tcp { public: C_handle_persist(tcb *t): tc(t) { } void do_request(uint64_t r) { - tc->persist_fd.destroy(); + tc->persist_fd.reset(); tc->persist(); } }; @@ -364,7 +364,7 @@ class tcp { int16_t _errno = 1; tcp_option _option; EventCallbackRef delayed_ack_event; - Tub _delayed_ack_fd; + std::optional _delayed_ack_fd; // Retransmission timeout std::chrono::microseconds _rto{1000*1000}; std::chrono::microseconds _persist_time_out{1000*1000}; @@ -374,10 +374,10 @@ class tcp { static constexpr std::chrono::microseconds _rto_clk_granularity{1000}; static constexpr uint16_t _max_nr_retransmit{5}; EventCallbackRef retransmit_event; - Tub retransmit_fd; + std::optional retransmit_fd; EventCallbackRef persist_event; EventCallbackRef all_data_ack_event; - Tub persist_fd; + std::optional persist_fd; uint16_t _nr_full_seg_received = 0; struct isn_secret { // 512 bits secretkey for ISN generating @@ -407,13 +407,13 @@ class tcp { bool is_all_data_acked(); int send(Packet p); void connect(); - Tub read(); + std::optional read(); void close(); void remove_from_tcbs() { auto id = connid{_local_ip, _foreign_ip, _local_port, _foreign_port}; _tcp._tcbs.erase(id); } - Tub get_packet(); + std::optional get_packet(); void output() { if (!_poll_active) { _poll_active = true; @@ -473,23 +473,23 @@ class tcp { void start_retransmit_timer() { if (retransmit_fd) center->delete_time_event(*retransmit_fd); - retransmit_fd.construct(center->create_time_event(_rto.count(), retransmit_event)); + retransmit_fd.emplace(center->create_time_event(_rto.count(), retransmit_event)); }; void stop_retransmit_timer() { if (retransmit_fd) { center->delete_time_event(*retransmit_fd); - retransmit_fd.destroy(); + retransmit_fd.reset(); } }; void start_persist_timer() { if (persist_fd) center->delete_time_event(*persist_fd); - persist_fd.construct(center->create_time_event(_persist_time_out.count(), persist_event)); + persist_fd.emplace(center->create_time_event(_persist_time_out.count(), persist_event)); }; void stop_persist_timer() { if (persist_fd) { center->delete_time_event(*persist_fd); - persist_fd.destroy(); + persist_fd.reset(); } }; void persist(); @@ -659,7 +659,7 @@ class tcp { int send(Packet p) { return _tcb->send(std::move(p)); } - Tub read() { + std::optional read() { return _tcb->read(); } int16_t get_errno() const { @@ -711,8 +711,8 @@ class tcp { _fd = _tcp.manager.get_eventfd(); return 0; } - Tub accept() { - Tub c; + std::optional accept() { + std::optional c; if (!_q.empty()) { c = std::move(_q.front()); _q.pop(); @@ -772,7 +772,7 @@ tcp::tcp(CephContext *c, inet_type& inet, EventCenter *cen) _e(_rd()), _queue_space(cct, "DPDK::tcp::queue_space", 81920) { int tcb_polled = 0u; _inet.register_packet_provider([this, tcb_polled] () mutable { - Tub l4p; + std::optional l4p; auto c = _poll_tcbs.size(); if (!_packetq.empty() && (!(tcb_polled % 128) || c == 0)) { l4p = std::move(_packetq.front()); @@ -1220,12 +1220,12 @@ bool tcp::tcb::is_all_data_acked() { } template -Tub tcp::tcb::read() { - Tub p; +std::optional tcp::tcb::read() { + std::optional p; if (_rcv.data.empty()) return p; - p.construct(); + p.emplace(); for (auto&& q : _rcv.data) { p->append(std::move(q)); } @@ -1282,7 +1282,7 @@ bool tcp::tcb::should_send_ack(uint16_t seg_len) { _nr_full_seg_received = 0; if (_delayed_ack_fd) { center->delete_time_event(*_delayed_ack_fd); - _delayed_ack_fd.destroy(); + _delayed_ack_fd.reset(); } return true; } @@ -1293,7 +1293,7 @@ bool tcp::tcb::should_send_ack(uint16_t seg_len) { _nr_full_seg_received = 0; if (_delayed_ack_fd) { center->delete_time_event(*_delayed_ack_fd); - _delayed_ack_fd.destroy(); + _delayed_ack_fd.reset(); } return true; } @@ -1307,7 +1307,7 @@ bool tcp::tcb::should_send_ack(uint16_t seg_len) { // If the timer is not armed, schedule a delayed ACK. // The maximum delayed ack timer allowed by RFC1122 is 500ms, most // implementations use 200ms. - _delayed_ack_fd.construct(center->create_time_event(200*1000, delayed_ack_event)); + _delayed_ack_fd.emplace(center->create_time_event(200*1000, delayed_ack_event)); return false; } @@ -1315,7 +1315,7 @@ template void tcp::tcb::clear_delayed_ack() { if (_delayed_ack_fd) { center->delete_time_event(*_delayed_ack_fd); - _delayed_ack_fd.destroy(); + _delayed_ack_fd.reset(); } } @@ -1452,13 +1452,13 @@ tcp_sequence tcp::tcb::get_isn() { } template -Tub tcp::tcb::get_packet() { +std::optional tcp::tcb::get_packet() { _poll_active = false; if (_packetq.empty()) { output_one(); } - Tub p; + std::optional p; if (in_state(CLOSED)) { return p; } diff --git a/src/msg/async/dpdk/UserspaceEvent.cc b/src/msg/async/dpdk/UserspaceEvent.cc index 282dcef12f60..e0c57fd9bd94 100644 --- a/src/msg/async/dpdk/UserspaceEvent.cc +++ b/src/msg/async/dpdk/UserspaceEvent.cc @@ -33,9 +33,9 @@ int UserspaceEventManager::get_eventfd() fds.resize(fd + 1); } - Tub &impl = fds[fd]; + std::optional &impl = fds[fd]; ceph_assert(!impl); - impl.construct(); + impl.emplace(); ldout(cct, 20) << __func__ << " fd=" << fd << dendl; return fd; } @@ -46,7 +46,7 @@ int UserspaceEventManager::notify(int fd, int mask) if ((size_t)fd >= fds.size()) return -ENOENT; - Tub &impl = fds[fd]; + std::optional &impl = fds[fd]; if (!impl) return -ENOENT; @@ -77,7 +77,7 @@ void UserspaceEventManager::close(int fd) if ((size_t)fd >= fds.size()) return ; - Tub &impl = fds[fd]; + std::optional &impl = fds[fd]; if (!impl) return ; @@ -93,7 +93,7 @@ void UserspaceEventManager::close(int fd) } waiting_fds[impl->waiting_idx] = -1; } - impl.destroy(); + impl.reset(); } int UserspaceEventManager::poll(int *events, int *masks, int num_events, struct timeval *tp) @@ -109,7 +109,7 @@ int UserspaceEventManager::poll(int *events, int *masks, int num_events, struct continue; events[count] = fd; - Tub &impl = fds[fd]; + std::optional &impl = fds[fd]; ceph_assert(impl); masks[count] = impl->listening_mask & impl->activating_mask; ceph_assert(masks[count]); diff --git a/src/msg/async/dpdk/UserspaceEvent.h b/src/msg/async/dpdk/UserspaceEvent.h index 7e89517df873..49308aca439b 100644 --- a/src/msg/async/dpdk/UserspaceEvent.h +++ b/src/msg/async/dpdk/UserspaceEvent.h @@ -20,12 +20,12 @@ #include #include -#include #include +#include +#include #include "include/ceph_assert.h" #include "include/int_types.h" -#include "common/Tub.h" class CephContext; @@ -41,7 +41,7 @@ class UserspaceEventManager { CephContext *cct; int max_fd = 0; uint32_t max_wait_idx = 0; - std::vector > fds; + std::vector > fds; std::vector waiting_fds; std::list unused_fds; @@ -56,7 +56,7 @@ class UserspaceEventManager { if ((size_t)fd >= fds.size()) return -ENOENT; - Tub &impl = fds[fd]; + std::optional &impl = fds[fd]; if (!impl) return -ENOENT; @@ -74,7 +74,7 @@ class UserspaceEventManager { if ((size_t)fd >= fds.size()) return -ENOENT; - Tub &impl = fds[fd]; + std::optional &impl = fds[fd]; if (!impl) return -ENOENT; diff --git a/src/msg/async/dpdk/dpdk_rte.cc b/src/msg/async/dpdk/dpdk_rte.cc index 96cf896f8235..0937040a00b4 100644 --- a/src/msg/async/dpdk/dpdk_rte.cc +++ b/src/msg/async/dpdk/dpdk_rte.cc @@ -97,9 +97,9 @@ namespace dpdk { string2vector("-n"), string2vector(c->_conf->ms_dpdk_memory_channel), }; - Tub hugepages_path; + std::optional hugepages_path; if (!c->_conf->ms_dpdk_hugepages.empty()) { - hugepages_path.construct(c->_conf->ms_dpdk_hugepages); + hugepages_path.emplace(c->_conf->ms_dpdk_hugepages); } // If "hugepages" is not provided and DPDK PMD drivers mode is requested - diff --git a/src/msg/async/dpdk/net.cc b/src/msg/async/dpdk/net.cc index 6e361f182d15..c429c426cfc4 100644 --- a/src/msg/async/dpdk/net.cc +++ b/src/msg/async/dpdk/net.cc @@ -52,7 +52,7 @@ interface::interface(CephContext *cct, std::shared_ptr dev, EventCen auto idx = 0u; unsigned qid = center->get_id(); dev->queue_for_cpu(center->get_id()).register_packet_provider([this, idx, qid] () mutable { - Tub p; + std::optional p; for (size_t i = 0; i < _pkt_providers.size(); i++) { auto l3p = _pkt_providers[idx++](); if (idx == _pkt_providers.size()) diff --git a/src/msg/async/dpdk/net.h b/src/msg/async/dpdk/net.h index 63f0422b72cf..1966f847c948 100644 --- a/src/msg/async/dpdk/net.h +++ b/src/msg/async/dpdk/net.h @@ -80,7 +80,7 @@ class l3_protocol { ethernet_address to; Packet p; }; - using packet_provider_type = std::function ()>; + using packet_provider_type = std::function ()>; private: interface* _netif;