m->get_payload(),
m->get_middle(),
m->get_data());
- connection->outgoing_bl.append(message.get_buffer(session_stream_handlers));
+ if (!append_frame(message)) {
+ m->put();
+ return -EILSEQ;
+ }
ldout(cct, 5) << __func__ << " sending message m=" << m
<< " seq=" << m->get_seq() << " " << *m << dendl;
return rc;
}
-void ProtocolV2::append_keepalive() {
- ldout(cct, 10) << __func__ << dendl;
- auto keepalive_frame = KeepAliveFrame::Encode();
- connection->outgoing_bl.append(keepalive_frame.get_buffer(session_stream_handlers));
-}
-
-void ProtocolV2::append_keepalive_ack(utime_t ×tamp) {
- auto keepalive_ack_frame = KeepAliveFrameAck::Encode(timestamp);
- connection->outgoing_bl.append(keepalive_ack_frame.get_buffer(session_stream_handlers));
+template <class F>
+bool ProtocolV2::append_frame(F& frame) {
+ ceph::bufferlist bl;
+ try {
+ bl = frame.get_buffer(session_stream_handlers);
+ } catch (ceph::crypto::onwire::TxHandlerError &e) {
+ ldout(cct, 1) << __func__ << " " << e.what() << dendl;
+ return false;
+ }
+ connection->outgoing_bl.append(bl);
+ return true;
}
void ProtocolV2::handle_message_ack(uint64_t seq) {
connection->write_lock.lock();
if (can_write) {
if (keepalive) {
- append_keepalive();
+ ldout(cct, 10) << __func__ << " appending keepalive" << dendl;
+ auto keepalive_frame = KeepAliveFrame::Encode();
+ if (!append_frame(keepalive_frame)) {
+ connection->write_lock.unlock();
+ connection->lock.lock();
+ fault();
+ connection->lock.unlock();
+ return;
+ }
keepalive = false;
}
if (left) {
ceph_le64 s;
s = in_seq;
- auto ack = AckFrame::Encode(in_seq);
- connection->outgoing_bl.append(ack.get_buffer(session_stream_handlers));
ldout(cct, 10) << __func__ << " try send msg ack, acked " << left
<< " messages" << dendl;
- ack_left -= left;
- left = ack_left;
- r = connection->_try_send(left);
+ auto ack_frame = AckFrame::Encode(in_seq);
+ if (append_frame(ack_frame)) {
+ ack_left -= left;
+ left = ack_left;
+ r = connection->_try_send(left);
+ } else {
+ r = -EILSEQ;
+ }
} else if (is_queued()) {
r = connection->_try_send();
}
CtPtr ProtocolV2::write(const std::string &desc,
CONTINUATION_TYPE<ProtocolV2> &next,
F &frame) {
- ceph::bufferlist bl = frame.get_buffer(session_stream_handlers);
+ ceph::bufferlist bl;
+ try {
+ bl = frame.get_buffer(session_stream_handlers);
+ } catch (ceph::crypto::onwire::TxHandlerError &e) {
+ ldout(cct, 1) << __func__ << " " << e.what() << dendl;
+ return _fault();
+ }
return write(desc, next, bl);
}
ldout(cct, 30) << __func__ << " got KEEPALIVE2 tag ..." << dendl;
connection->write_lock.lock();
- append_keepalive_ack(keepalive_frame.timestamp());
+ auto keepalive_ack_frame = KeepAliveFrameAck::Encode(keepalive_frame.timestamp());
+ if (!append_frame(keepalive_ack_frame)) {
+ connection->write_lock.unlock();
+ return _fault();
+ }
connection->write_lock.unlock();
ldout(cct, 20) << __func__ << " got KEEPALIVE2 "
CONTINUATION_TYPE<ProtocolV2> &next,
bufferlist &buffer);
+ template <class F>
+ bool append_frame(F& frame);
+
void requeue_sent();
uint64_t discard_requeued_up_to(uint64_t out_seq, uint64_t seq);
void reset_recv_state();
void prepare_send_message(uint64_t features, Message *m);
out_queue_entry_t _get_next_outgoing();
ssize_t write_message(Message *m, bool more);
- void append_keepalive();
- void append_keepalive_ack(utime_t ×tamp);
void handle_message_ack(uint64_t seq);
CONTINUATION_DECL(ProtocolV2, _wait_for_peer_banner);
struct nonce_t {
std::uint32_t random_seq;
std::uint64_t random_rest;
+
+ bool operator==(const nonce_t& rhs) const {
+ return !memcmp(this, &rhs, sizeof(*this));
+ }
} __attribute__((packed));
static_assert(sizeof(nonce_t) == AESGCM_IV_LEN);
CephContext* const cct;
std::unique_ptr<EVP_CIPHER_CTX, decltype(&::EVP_CIPHER_CTX_free)> ectx;
ceph::bufferlist buffer;
- nonce_t nonce;
+ nonce_t nonce, initial_nonce;
+ bool used_initial_nonce;
static_assert(sizeof(nonce) == AESGCM_IV_LEN);
public:
const nonce_t& nonce)
: cct(cct),
ectx(EVP_CIPHER_CTX_new(), EVP_CIPHER_CTX_free),
- nonce(nonce) {
+ nonce(nonce), initial_nonce(nonce), used_initial_nonce(false) {
ceph_assert_always(ectx);
ceph_assert_always(key.size() * CHAR_BIT == 128);
~AES128GCM_OnWireTxHandler() override {
::ceph::crypto::zeroize_for_security(&nonce, sizeof(nonce));
+ ::ceph::crypto::zeroize_for_security(&initial_nonce, sizeof(initial_nonce));
}
std::uint32_t calculate_segment_size(std::uint32_t size) override
void AES128GCM_OnWireTxHandler::reset_tx_handler(
std::initializer_list<std::uint32_t> update_size_sequence)
{
+ if (nonce == initial_nonce) {
+ if (used_initial_nonce) {
+ throw ceph::crypto::onwire::TxHandlerError("out of nonces");
+ }
+ used_initial_nonce = true;
+ }
+
if(1 != EVP_EncryptInit_ex(ectx.get(), nullptr, nullptr, nullptr,
reinterpret_cast<const unsigned char*>(&nonce))) {
throw std::runtime_error("EVP_EncryptInit_ex failed");