)
{}
-Tub<l3_protocol::l3packet> arp::get_packet()
+std::optional<l3_protocol::l3packet> arp::get_packet()
{
- Tub<l3_protocol::l3packet> p;
+ std::optional<l3_protocol::l3packet> p;
if (!_packetq.empty()) {
p = std::move(_packetq.front());
_packetq.pop_front();
ethernet_address l2self() { return _netif->hw_address(); }
int process_packet(Packet p, ethernet_address from);
bool forward(forward_hash& out_hash_data, Packet& p, size_t off);
- Tub<l3_protocol::l3packet> get_packet();
+ std::optional<l3_protocol::l3packet> get_packet();
template <class l3_proto>
friend class arp_for;
};
return;
}
register_packet_provider([this] {
- Tub<Packet> p;
+ std::optional<Packet> p;
if (!_proxy_packetq.empty()) {
p = std::move(_proxy_packetq.front());
_proxy_packetq.pop_front();
return false;
}
-inline Tub<Packet> DPDKQueuePair::from_mbuf_lro(rte_mbuf* m)
+inline std::optional<Packet> DPDKQueuePair::from_mbuf_lro(rte_mbuf* m)
{
_frags.clear();
_bufs.clear();
_frags.begin(), _frags.end(), make_deleter(std::move(del)));
}
-inline Tub<Packet> DPDKQueuePair::from_mbuf(rte_mbuf* m)
+inline std::optional<Packet> DPDKQueuePair::from_mbuf(rte_mbuf* m)
{
_rx_free_pkts.push_back(m);
_num_rx_free_segs += m->nb_segs;
struct rte_mbuf *m = bufs[i];
offload_info oi;
- Tub<Packet> p = from_mbuf(m);
+ std::optional<Packet> p = from_mbuf(m);
// Drop the packet if translation above has failed
if (!p) {
#ifndef CEPH_DPDK_DEV_H
#define CEPH_DPDK_DEV_H
-#include <memory>
#include <functional>
+#include <memory>
+#include <optional>
#include <rte_config.h>
#include <rte_common.h>
#include <rte_ethdev.h>
+#include <rte_ether.h>
#include <rte_malloc.h>
#include <rte_version.h>
#include "include/page.h"
-#include "common/Tub.h"
#include "common/perf_counters.h"
#include "msg/async/Event.h"
#include "const.h"
class DPDKDevice;
class DPDKWorker;
+
class DPDKQueuePair {
- using packet_provider_type = std::function<Tub<Packet> ()>;
+ using packet_provider_type = std::function<std::optional<Packet> ()>;
public:
void configure_proxies(const std::map<unsigned, float>& cpu_weights);
// build REdirection TAble for cpu_weights map: target cpu -> weight
private:
struct rte_mbuf _mbuf;
MARKER private_start;
- Tub<Packet> _p;
+ std::optional<Packet> _p;
phys_addr_t _buf_physaddr;
uint16_t _data_off;
// TRUE if underlying mbuf has been used in the zero-copy flow
}
void rx_start() {
- _rx_poller.construct(this);
+ _rx_poller.emplace(this);
}
uint32_t send(circular_buffer<Packet>& pb) {
* @return a "optional" object representing the newly received data if in an
* "engaged" state or an error if in a "disengaged" state.
*/
- Tub<Packet> from_mbuf(rte_mbuf* m);
+ std::optional<Packet> from_mbuf(rte_mbuf* m);
/**
* Transform an LRO rte_mbuf cluster into the "packet" object.
* @return a "optional" object representing the newly received LRO packet if
* in an "engaged" state or an error if in a "disengaged" state.
*/
- Tub<Packet> from_mbuf_lro(rte_mbuf* m);
+ std::optional<Packet> from_mbuf_lro(rte_mbuf* m);
private:
CephContext *cct;
std::vector<packet_provider_type> _pkt_providers;
- Tub<std::array<uint8_t, 128>> _sw_reta;
+ std::optional<std::array<uint8_t, 128>> _sw_reta;
circular_buffer<Packet> _proxy_packetq;
stream<Packet> _rx_stream;
circular_buffer<Packet> _tx_packetq;
return qp->poll_rx_once();
}
};
- Tub<DPDKRXPoller> _rx_poller;
+ std::optional<DPDKRXPoller> _rx_poller;
class DPDKTXGCPoller : public EventCenter::Poller {
DPDKQueuePair *qp;
#define CEPH_MSG_DPDKSTACK_H
#include <functional>
+#include <optional>
#include "common/ceph_context.h"
-#include "common/Tub.h"
#include "msg/async/Stack.h"
#include "net.h"
typename Protocol::connection _conn;
uint32_t _cur_frag = 0;
uint32_t _cur_off = 0;
- Tub<Packet> _buf;
- Tub<bufferptr> _cache_ptr;
+ std::optional<Packet> _buf;
+ std::optional<bufferptr> _cache_ptr;
public:
explicit NativeConnectedSocketImpl(typename Protocol::connection conn)
size_t off = 0;
while (left > 0) {
if (!_cache_ptr) {
- _cache_ptr.construct();
+ _cache_ptr.emplace();
r = zero_copy_read(*_cache_ptr);
if (r <= 0) {
- _cache_ptr.destroy();
+ _cache_ptr.reset();
if (r == -EAGAIN)
break;
return r;
_cache_ptr->copy_out(0, _cache_ptr->length(), buf+off);
left -= _cache_ptr->length();
off += _cache_ptr->length();
- _cache_ptr.destroy();
+ _cache_ptr.reset();
} else {
_cache_ptr->copy_out(0, left, buf+off);
_cache_ptr->set_offset(_cache_ptr->offset() + left);
if (++_cur_frag == _buf->nr_frags()) {
_cur_frag = 0;
_cur_off = 0;
- _buf.destroy();
+ _buf.reset();
} else {
_cur_off += f.size;
}
}
}
-Tub<l3_protocol::l3packet> ipv4::get_packet() {
+std::optional<l3_protocol::l3packet> ipv4::get_packet() {
// _packetq will be mostly empty here unless it hold remnants of previously
// fragmented packet
if (_packetq.empty()) {
}
}
- Tub<l3_protocol::l3packet> p;
+ std::optional<l3_protocol::l3packet> p;
if (!_packetq.empty()) {
p = std::move(_packetq.front());
_packetq.pop_front();
ethernet_address e_dst;
ip_protocol_num proto_num;
};
- using packet_provider_type = std::function<Tub<l4packet> ()>;
+ using packet_provider_type = std::function<std::optional<l4packet> ()>;
static void tcp_pseudo_header_checksum(checksummer& csum, ipv4_address src, ipv4_address dst, uint16_t len) {
csum.sum_many(src.ip, dst.ip, uint8_t(0), uint8_t(ip_protocol_num::tcp), len);
}
explicit icmp(CephContext *c, inet_type& inet)
: cct(c), _inet(inet), _queue_space(c, "DPDK::icmp::_queue_space", 212992) {
_inet.register_packet_provider([this] {
- Tub<ipv4_traits::l4packet> l4p;
+ std::optional<ipv4_traits::l4packet> l4p;
if (!_packetq.empty()) {
l4p = std::move(_packetq.front());
_packetq.pop_front();
private:
interface* _netif;
std::vector<ipv4_traits::packet_provider_type> _pkt_providers;
- Tub<uint64_t> frag_timefd;
+ std::optional<uint64_t> frag_timefd;
EventCallbackRef frag_handler;
arp _global_arp;
arp_for<ipv4> _arp;
private:
int handle_received_packet(Packet p, ethernet_address from);
bool forward(forward_hash& out_hash_data, Packet& p, size_t off);
- Tub<l3_protocol::l3packet> get_packet();
+ std::optional<l3_protocol::l3packet> get_packet();
bool in_my_netmask(ipv4_address a) const {
return !((a.ip ^ _host_address.ip) & _netmask.ip);
}
}
void frag_arm(utime_t now) {
auto tp = now + _frag_timeout;
- frag_timefd.construct(center->create_time_event(tp.to_nsec() / 1000, frag_handler));
+ frag_timefd = center->create_time_event(tp.to_nsec() / 1000, frag_handler);
}
void frag_arm() {
auto now = ceph_clock_now();
- frag_timefd.construct(center->create_time_event(now.to_nsec() / 1000, frag_handler));
+ frag_timefd = center->create_time_event(now.to_nsec() / 1000, frag_handler);
}
public:
#include <iosfwd>
#include "include/types.h"
-#include "common/Tub.h"
#include "common/deleter.h"
#include "msg/async/Event.h"
bool reassembled = false;
uint16_t tso_seg_size = 0;
// HW stripped VLAN header (CPU order)
- Tub<uint16_t> vlan_tci;
+ std::optional<uint16_t> vlan_tci;
};
// Zero-copy friendly packet class
uint16_t _nr_frags = 0;
uint16_t _allocated_frags;
offload_info _offload_info;
- Tub<uint32_t> rss_hash;
+ std::optional<uint32_t> rss_hash;
char data[internal_data_size]; // only frags[0] may use
unsigned headroom = internal_data_size; // in data
// FIXME: share data/frags space
n->_nr_frags = old->_nr_frags;
n->headroom = old->headroom;
n->_offload_info = old->_offload_info;
- n->rss_hash.construct(old->rss_hash);
+ n->rss_hash = old->rss_hash;
std::copy(old->frags, old->frags + old->_nr_frags, n->frags);
old->copy_internal_fragment_to(n.get());
return n;
_impl = impl::allocate_if_needed(std::move(_impl), extra);
}
}
- Tub<uint32_t> rss_hash() {
+ std::optional<uint32_t> rss_hash() {
return _impl->rss_hash;
}
void set_rss_hash(uint32_t hash) {
- _impl->rss_hash.construct(hash);
+ _impl->rss_hash = hash;
}
private:
void linearize(size_t at_frag, size_t desired_size);
public:
C_handle_delayed_ack(tcb *t): tc(t) { }
void do_request(uint64_t r) {
- tc->_delayed_ack_fd.destroy();
+ tc->_delayed_ack_fd.reset();
tc->_nr_full_seg_received = 0;
tc->output();
}
public:
C_handle_retransmit(tcb *t): tc(t) { }
void do_request(uint64_t r) {
- tc->retransmit_fd.destroy();
+ tc->retransmit_fd.reset();
tc->retransmit();
}
};
public:
C_handle_persist(tcb *t): tc(t) { }
void do_request(uint64_t r) {
- tc->persist_fd.destroy();
+ tc->persist_fd.reset();
tc->persist();
}
};
int16_t _errno = 1;
tcp_option _option;
EventCallbackRef delayed_ack_event;
- Tub<uint64_t> _delayed_ack_fd;
+ std::optional<uint64_t> _delayed_ack_fd;
// Retransmission timeout
std::chrono::microseconds _rto{1000*1000};
std::chrono::microseconds _persist_time_out{1000*1000};
static constexpr std::chrono::microseconds _rto_clk_granularity{1000};
static constexpr uint16_t _max_nr_retransmit{5};
EventCallbackRef retransmit_event;
- Tub<uint64_t> retransmit_fd;
+ std::optional<uint64_t> retransmit_fd;
EventCallbackRef persist_event;
EventCallbackRef all_data_ack_event;
- Tub<uint64_t> persist_fd;
+ std::optional<uint64_t> persist_fd;
uint16_t _nr_full_seg_received = 0;
struct isn_secret {
// 512 bits secretkey for ISN generating
bool is_all_data_acked();
int send(Packet p);
void connect();
- Tub<Packet> read();
+ std::optional<Packet> read();
void close();
void remove_from_tcbs() {
auto id = connid{_local_ip, _foreign_ip, _local_port, _foreign_port};
_tcp._tcbs.erase(id);
}
- Tub<typename InetTraits::l4packet> get_packet();
+ std::optional<typename InetTraits::l4packet> get_packet();
void output() {
if (!_poll_active) {
_poll_active = true;
void start_retransmit_timer() {
if (retransmit_fd)
center->delete_time_event(*retransmit_fd);
- retransmit_fd.construct(center->create_time_event(_rto.count(), retransmit_event));
+ retransmit_fd.emplace(center->create_time_event(_rto.count(), retransmit_event));
};
void stop_retransmit_timer() {
if (retransmit_fd) {
center->delete_time_event(*retransmit_fd);
- retransmit_fd.destroy();
+ retransmit_fd.reset();
}
};
void start_persist_timer() {
if (persist_fd)
center->delete_time_event(*persist_fd);
- persist_fd.construct(center->create_time_event(_persist_time_out.count(), persist_event));
+ persist_fd.emplace(center->create_time_event(_persist_time_out.count(), persist_event));
};
void stop_persist_timer() {
if (persist_fd) {
center->delete_time_event(*persist_fd);
- persist_fd.destroy();
+ persist_fd.reset();
}
};
void persist();
int send(Packet p) {
return _tcb->send(std::move(p));
}
- Tub<Packet> read() {
+ std::optional<Packet> read() {
return _tcb->read();
}
int16_t get_errno() const {
_fd = _tcp.manager.get_eventfd();
return 0;
}
- Tub<connection> accept() {
- Tub<connection> c;
+ std::optional<connection> accept() {
+ std::optional<connection> c;
if (!_q.empty()) {
c = std::move(_q.front());
_q.pop();
_e(_rd()), _queue_space(cct, "DPDK::tcp::queue_space", 81920) {
int tcb_polled = 0u;
_inet.register_packet_provider([this, tcb_polled] () mutable {
- Tub<typename InetTraits::l4packet> l4p;
+ std::optional<typename InetTraits::l4packet> l4p;
auto c = _poll_tcbs.size();
if (!_packetq.empty() && (!(tcb_polled % 128) || c == 0)) {
l4p = std::move(_packetq.front());
}
template <typename InetTraits>
-Tub<Packet> tcp<InetTraits>::tcb::read() {
- Tub<Packet> p;
+std::optional<Packet> tcp<InetTraits>::tcb::read() {
+ std::optional<Packet> p;
if (_rcv.data.empty())
return p;
- p.construct();
+ p.emplace();
for (auto&& q : _rcv.data) {
p->append(std::move(q));
}
_nr_full_seg_received = 0;
if (_delayed_ack_fd) {
center->delete_time_event(*_delayed_ack_fd);
- _delayed_ack_fd.destroy();
+ _delayed_ack_fd.reset();
}
return true;
}
_nr_full_seg_received = 0;
if (_delayed_ack_fd) {
center->delete_time_event(*_delayed_ack_fd);
- _delayed_ack_fd.destroy();
+ _delayed_ack_fd.reset();
}
return true;
}
// If the timer is not armed, schedule a delayed ACK.
// The maximum delayed ack timer allowed by RFC1122 is 500ms, most
// implementations use 200ms.
- _delayed_ack_fd.construct(center->create_time_event(200*1000, delayed_ack_event));
+ _delayed_ack_fd.emplace(center->create_time_event(200*1000, delayed_ack_event));
return false;
}
void tcp<InetTraits>::tcb::clear_delayed_ack() {
if (_delayed_ack_fd) {
center->delete_time_event(*_delayed_ack_fd);
- _delayed_ack_fd.destroy();
+ _delayed_ack_fd.reset();
}
}
}
template <typename InetTraits>
-Tub<typename InetTraits::l4packet> tcp<InetTraits>::tcb::get_packet() {
+std::optional<typename InetTraits::l4packet> tcp<InetTraits>::tcb::get_packet() {
_poll_active = false;
if (_packetq.empty()) {
output_one();
}
- Tub<typename InetTraits::l4packet> p;
+ std::optional<typename InetTraits::l4packet> p;
if (in_state(CLOSED)) {
return p;
}
fds.resize(fd + 1);
}
- Tub<UserspaceFDImpl> &impl = fds[fd];
+ std::optional<UserspaceFDImpl> &impl = fds[fd];
ceph_assert(!impl);
- impl.construct();
+ impl.emplace();
ldout(cct, 20) << __func__ << " fd=" << fd << dendl;
return fd;
}
if ((size_t)fd >= fds.size())
return -ENOENT;
- Tub<UserspaceFDImpl> &impl = fds[fd];
+ std::optional<UserspaceFDImpl> &impl = fds[fd];
if (!impl)
return -ENOENT;
if ((size_t)fd >= fds.size())
return ;
- Tub<UserspaceFDImpl> &impl = fds[fd];
+ std::optional<UserspaceFDImpl> &impl = fds[fd];
if (!impl)
return ;
}
waiting_fds[impl->waiting_idx] = -1;
}
- impl.destroy();
+ impl.reset();
}
int UserspaceEventManager::poll(int *events, int *masks, int num_events, struct timeval *tp)
continue;
events[count] = fd;
- Tub<UserspaceFDImpl> &impl = fds[fd];
+ std::optional<UserspaceFDImpl> &impl = fds[fd];
ceph_assert(impl);
masks[count] = impl->listening_mask & impl->activating_mask;
ceph_assert(masks[count]);
#include <errno.h>
#include <string.h>
-#include <vector>
#include <list>
+#include <optional>
+#include <vector>
#include "include/ceph_assert.h"
#include "include/int_types.h"
-#include "common/Tub.h"
class CephContext;
CephContext *cct;
int max_fd = 0;
uint32_t max_wait_idx = 0;
- std::vector<Tub<UserspaceFDImpl> > fds;
+ std::vector<std::optional<UserspaceFDImpl> > fds;
std::vector<int> waiting_fds;
std::list<uint32_t> unused_fds;
if ((size_t)fd >= fds.size())
return -ENOENT;
- Tub<UserspaceFDImpl> &impl = fds[fd];
+ std::optional<UserspaceFDImpl> &impl = fds[fd];
if (!impl)
return -ENOENT;
if ((size_t)fd >= fds.size())
return -ENOENT;
- Tub<UserspaceFDImpl> &impl = fds[fd];
+ std::optional<UserspaceFDImpl> &impl = fds[fd];
if (!impl)
return -ENOENT;
string2vector("-n"), string2vector(c->_conf->ms_dpdk_memory_channel),
};
- Tub<std::string> hugepages_path;
+ std::optional<std::string> hugepages_path;
if (!c->_conf->ms_dpdk_hugepages.empty()) {
- hugepages_path.construct(c->_conf->ms_dpdk_hugepages);
+ hugepages_path.emplace(c->_conf->ms_dpdk_hugepages);
}
// If "hugepages" is not provided and DPDK PMD drivers mode is requested -
auto idx = 0u;
unsigned qid = center->get_id();
dev->queue_for_cpu(center->get_id()).register_packet_provider([this, idx, qid] () mutable {
- Tub<Packet> p;
+ std::optional<Packet> p;
for (size_t i = 0; i < _pkt_providers.size(); i++) {
auto l3p = _pkt_providers[idx++]();
if (idx == _pkt_providers.size())
ethernet_address to;
Packet p;
};
- using packet_provider_type = std::function<Tub<l3packet> ()>;
+ using packet_provider_type = std::function<std::optional<l3packet> ()>;
private:
interface* _netif;