BuildRequires: librabbitmq-devel
%endif
%if 0%{with kafka_endpoint}
-BuildRequires: librdkafka-devel
+BuildRequires: librdkafka-devel >= 1.1.0
%endif
%if 0%{with lua_packages}
Requires: lua-devel
librabbitmq-dev,
libre2-dev,
libutf8proc-dev (>= 2.2.0),
- librdkafka-dev,
+ librdkafka-dev (>= 1.1.0),
libthrift-dev (>= 0.13.0),
libyaml-cpp-dev (>= 0.6),
libzstd-dev <pkg.ceph.check>,
find_package(RabbitMQ REQUIRED)
endif()
if(WITH_RADOSGW_KAFKA_ENDPOINT)
- find_package(RDKafka 0.9.2 REQUIRED)
+ find_package(RDKafka 1.1.0 REQUIRED)
endif()
target_link_libraries(rgw_a
const std::string& _password,
const boost::optional<const std::string&>& _ca_location,
const boost::optional<const std::string&>& _mechanism,
- bool _ssl)
- : broker(_broker), user(_user), password(_password), ssl(_ssl) {
+ bool _ssl,
+ bool _verify_ssl)
+ : broker(_broker), user(_user), password(_password), ssl(_ssl),
+ verify_ssl(_verify_ssl) {
if (_ca_location.has_value()) {
ca_location = _ca_location.get();
}
bool operator==(const connection_id_t& lhs, const connection_id_t& rhs) {
return lhs.broker == rhs.broker && lhs.user == rhs.user &&
lhs.password == rhs.password && lhs.ca_location == rhs.ca_location &&
- lhs.mechanism == rhs.mechanism && lhs.ssl == rhs.ssl;
+ lhs.mechanism == rhs.mechanism && lhs.ssl == rhs.ssl &&
+ lhs.verify_ssl == rhs.verify_ssl;
}
struct connection_id_hasher {
boost::hash_combine(h, k.ca_location);
boost::hash_combine(h, k.mechanism);
boost::hash_combine(h, k.ssl);
+ boost::hash_combine(h, k.verify_ssl);
return h;
}
};
CallbackList callbacks;
const std::string broker;
const bool use_ssl;
- const bool verify_ssl; // TODO currently ignored, not supported in librdkafka v0.11.6
+ const bool verify_ssl;
const boost::optional<std::string> ca_location;
const std::string user;
const std::string password;
} else {
ldout(conn->cct, 20) << "Kafka connect: using default CA location" << dendl;
}
- // Note: when librdkafka.1.0 is available the following line could be uncommented instead of the callback setting call
- // if (rd_kafka_conf_set(conn->conf, "enable.ssl.certificate.verification", "0", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) goto conf_error;
+ if (rd_kafka_conf_set(conf.get(), "enable.ssl.certificate.verification",
+ conn->verify_ssl ? "true" : "false",
+ errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
+ goto conf_error;
+ }
ldout(conn->cct, 20) << "Kafka connect: successfully configured security" << dendl;
} else if (!conn->user.empty()) {
}
connection_id_t tmp_id(broker_list, user, password, ca_location, mechanism,
- use_ssl);
+ use_ssl, verify_ssl);
std::lock_guard lock(connections_lock);
const auto it = connections.find(tmp_id);
// note that ssl vs. non-ssl connection to the same host are two separate connections
std::string ca_location;
std::string mechanism;
bool ssl = false;
+ bool verify_ssl = true;
connection_id_t() = default;
connection_id_t(const std::string& _broker,
const std::string& _user,
const std::string& _password,
const boost::optional<const std::string&>& _ca_location,
const boost::optional<const std::string&>& _mechanism,
- bool _ssl);
+ bool _ssl,
+ bool _verify_ssl);
};
std::string to_string(const connection_id_t& id);
conn2.delete_bucket(bucket_name)
-def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=False):
+def kafka_security(security_type, mechanism='PLAIN', use_topic_attrs_for_creds=False,
+ verify_ssl=True, include_ca_location=True):
""" test pushing kafka notification securly to master """
# Setup SCRAM users if needed
if mechanism.startswith('SCRAM'):
endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=false&mechanism='+mechanism
elif security_type == 'SASL_SSL':
KAFKA_DIR = os.environ['KAFKA_DIR']
- endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true&ca-location='+KAFKA_DIR+'/y-ca.crt&mechanism='+mechanism
+ endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true&mechanism='+mechanism
+ if include_ca_location:
+ endpoint_args += '&ca-location='+KAFKA_DIR+'/y-ca.crt'
if use_topic_attrs_for_creds:
endpoint_args += '&user-name=alice&password=alice-secret'
else:
KAFKA_DIR = os.environ['KAFKA_DIR']
- endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true&ca-location='+KAFKA_DIR+'/y-ca.crt'
+ endpoint_args = 'push-endpoint='+endpoint_address+'&kafka-ack-level=broker&use-ssl=true'
+ if include_ca_location:
+ endpoint_args += '&ca-location='+KAFKA_DIR+'/y-ca.crt'
+
+ if security_type in ('SSL', 'SASL_SSL') and not verify_ssl:
+ endpoint_args += '&verify-ssl=false'
topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args)
kafka_security('SSL')
+@pytest.mark.kafka_security_test
+def test_notification_kafka_security_ssl_skip_verification_without_ca():
+ kafka_security('SSL', verify_ssl=False, include_ca_location=False)
+
+
@pytest.mark.kafka_security_test
def test_notification_kafka_security_ssl_sasl():
kafka_security('SASL_SSL')