Interceptor *interceptor,
Socket *socket) {
if (interceptor) {
- auto action = interceptor->intercept(conn, Breakpoint(bp));
+ auto action = interceptor->intercept(
+ conn.get_local_shared_foreign_from_this(),
+ Breakpoint(bp));
socket->set_trap(type, action, &interceptor->blocker);
}
}
// supports CONTINUE/FAULT/BLOCK
if (conn.interceptor) {
auto action = conn.interceptor->intercept(
- conn, {custom_bp_t::SOCKET_CONNECTING});
+ conn.get_local_shared_foreign_from_this(),
+ {custom_bp_t::SOCKET_CONNECTING});
switch (action) {
case bp_action_t::CONTINUE:
return seastar::now();
return conn.interceptor->blocker.block();
default:
ceph_abort("unexpected action from trap");
+ return seastar::now();
}
} else {
return seastar::now();
has_socket = false;
#ifdef UNIT_TESTS_BUILT
if (conn.interceptor) {
- conn.interceptor->register_conn_replaced(conn);
+ conn.interceptor->register_conn_replaced(
+ conn.get_local_shared_foreign_from_this());
}
#endif
// close this connection because all the necessary information is delivered
#ifdef UNIT_TESTS_BUILT
if (conn.interceptor) {
auto action = conn.interceptor->intercept(
- conn, {custom_bp_t::SOCKET_ACCEPTED});
+ conn.get_local_shared_foreign_from_this(),
+ {custom_bp_t::SOCKET_ACCEPTED});
switch (action) {
case bp_action_t::CONTINUE:
break;
#ifdef UNIT_TESTS_BUILT
closed_clean = true;
if (conn.interceptor) {
- conn.interceptor->register_conn_closed(conn);
+ conn.interceptor->register_conn_closed(
+ conn.get_local_shared_foreign_from_this());
}
#endif
}).handle_exception([conn_ref = conn.shared_from_this(), this] (auto eptr) {
#include "crimson/net/Dispatcher.h"
#include "crimson/net/Messenger.h"
#include "crimson/net/Interceptor.h"
-#include "crimson/net/SocketConnection.h"
#include <map>
#include <random>
using crimson::net::Interceptor;
using crimson::net::Messenger;
using crimson::net::MessengerRef;
-using crimson::net::SocketConnection;
using crimson::net::SocketPolicy;
using crimson::net::tag_bp_t;
using namespace ceph::net::test;
}
private:
- void register_conn(SocketConnection& _conn) override {
- auto conn = _conn.get_local_shared_foreign_from_this();
+ void register_conn(ConnectionRef conn) override {
auto result = find_result(conn);
if (result != nullptr) {
logger().error("The connection [{}] {} already exists when register {}",
- result->index, *result->conn, _conn);
+ result->index, *result->conn, *conn);
ceph_abort();
}
unsigned index = results.size();
results.emplace_back(conn, index);
conns[conn] = index;
notify();
- logger().info("[{}] {} new connection registered", index, _conn);
+ logger().info("[{}] {} new connection registered", index, *conn);
}
- void register_conn_closed(SocketConnection& conn) override {
- auto result = find_result(conn.get_local_shared_foreign_from_this());
+ void register_conn_closed(ConnectionRef conn) override {
+ auto result = find_result(conn);
if (result == nullptr) {
- logger().error("Untracked closed connection: {}", conn);
+ logger().error("Untracked closed connection: {}", *conn);
ceph_abort();
}
result->state = conn_state_t::closed;
}
notify();
- logger().info("[{}] {} closed({})", result->index, conn, result->state);
+ logger().info("[{}] {} closed({})", result->index, *conn, result->state);
}
- void register_conn_ready(SocketConnection& conn) override {
- auto result = find_result(conn.get_local_shared_foreign_from_this());
+ void register_conn_ready(ConnectionRef conn) override {
+ auto result = find_result(conn);
if (result == nullptr) {
- logger().error("Untracked ready connection: {}", conn);
+ logger().error("Untracked ready connection: {}", *conn);
ceph_abort();
}
- ceph_assert(conn.is_connected());
+ ceph_assert(conn->is_connected());
notify();
- logger().info("[{}] {} ready", result->index, conn);
+ logger().info("[{}] {} ready", result->index, *conn);
}
- void register_conn_replaced(SocketConnection& conn) override {
- auto result = find_result(conn.get_local_shared_foreign_from_this());
+ void register_conn_replaced(ConnectionRef conn) override {
+ auto result = find_result(conn);
if (result == nullptr) {
- logger().error("Untracked replaced connection: {}", conn);
+ logger().error("Untracked replaced connection: {}", *conn);
ceph_abort();
}
result->state = conn_state_t::replaced;
- logger().info("[{}] {} {}", result->index, conn, result->state);
+ logger().info("[{}] {} {}", result->index, *conn, result->state);
}
- bp_action_t intercept(SocketConnection& conn, Breakpoint bp) override {
+ bp_action_t intercept(ConnectionRef conn, Breakpoint bp) override {
++breakpoints_counter[bp].counter;
- auto result = find_result(conn.get_local_shared_foreign_from_this());
+ auto result = find_result(conn);
if (result == nullptr) {
logger().error("Untracked intercepted connection: {}, at breakpoint {}({})",
- conn, bp, breakpoints_counter[bp].counter);
+ *conn, bp, breakpoints_counter[bp].counter);
ceph_abort();
}
auto it_cnt = it_bp->second.find(breakpoints_counter[bp].counter);
if (it_cnt != it_bp->second.end()) {
logger().info("[{}] {} intercepted {}({}) => {}",
- result->index, conn, bp,
+ result->index, *conn, bp,
breakpoints_counter[bp].counter, it_cnt->second);
return it_cnt->second;
}
}
logger().info("[{}] {} intercepted {}({})",
- result->index, conn, bp, breakpoints_counter[bp].counter);
+ result->index, *conn, bp, breakpoints_counter[bp].counter);
return bp_action_t::CONTINUE;
}
};