From: Gal Salomon Date: Thu, 11 Apr 2024 16:37:10 +0000 (+0300) Subject: rgw/s3select : fix for error flow. X-Git-Tag: v19.2.3~140^2~5 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=09cba1d9fcde93aaaa03f855d9e9512fa362ebee;p=ceph.git rgw/s3select : fix for error flow. in some cases the error message does not return to client, connection got broken (invalid chunk length) fix another broken connection all data-source to use same API for sending error-response add the option rgw_s3select_disable(boolean). upon turning-on this option, it rejects s3select-requests with an error-message editorial. rollback to ceph-master. the ceph/s3-tests#561 must be merged with ceph-PR Signed-off-by: Gal Salomon (cherry picked from commit 53ad57c9d4c18e369128f9e0f3a143f5608b56f9) --- diff --git a/src/common/options/rgw.yaml.in b/src/common/options/rgw.yaml.in index eff70c92feb31..f1055981c53ca 100644 --- a/src/common/options/rgw.yaml.in +++ b/src/common/options/rgw.yaml.in @@ -51,6 +51,14 @@ options: services: - rgw with_legacy: true +- name: rgw_disable_s3select + type: bool + level: advanced + desc: disable the s3select operation; RGW will report an error and will return ERR_INVALID_REQUEST. + default: false + services: + - rgw + with_legacy: true - name: rgw_rados_tracing type: bool level: advanced diff --git a/src/rgw/rgw_s3select.cc b/src/rgw/rgw_s3select.cc index 1b7dced278217..d989147cdc7d6 100644 --- a/src/rgw/rgw_s3select.cc +++ b/src/rgw/rgw_s3select.cc @@ -173,6 +173,7 @@ void aws_response_handler::init_success_response() void aws_response_handler::send_continuation_response() { + m_fp_chunk_encoding(); set_continue_buffer(); continue_result.resize(header_crc_size, '\0'); get_buffer()->clear(); @@ -203,6 +204,7 @@ void aws_response_handler::init_stats_response() void aws_response_handler::init_end_response() { + m_fp_chunk_encoding(); sql_result.resize(header_crc_size, '\0'); get_buffer()->clear(); header_size = create_header_end(); @@ -212,12 +214,13 @@ void aws_response_handler::init_end_response() rgw_flush_formatter_and_reset(s, s->formatter); } -void aws_response_handler::send_error_response(const char* error_message) +void aws_response_handler::send_error_response(const char* error_code, const char* error_message, const char* resource_id) { - //currently not in use. need to change the s3-test, this error-response raises a boto3 exception + m_fp_chunk_encoding(); + std::string out_error_msg = std::string(error_code) + " :" + std::string(error_message) + " :" + std::string(resource_id); error_result.resize(header_crc_size, '\0'); get_buffer()->clear(); - header_size = create_error_header_records(error_message); + header_size = create_error_header_records(out_error_msg.data()); error_result.append(get_buffer()->c_str(), header_size); int buff_len = create_message(header_size,&error_result); @@ -230,14 +233,17 @@ void aws_response_handler::send_success_response() #ifdef PAYLOAD_TAG sql_result.append(END_PAYLOAD_LINE); #endif + m_fp_chunk_encoding(); int buff_len = create_message(m_success_header_size); s->formatter->write_bin_data(sql_result.data(), buff_len); rgw_flush_formatter_and_reset(s, s->formatter); } -void aws_response_handler::send_error_response_rgw_formatter(const char* error_code, - const char* error_message, - const char* resource_id) +static constexpr const char* empty_error="--"; + +void aws_response_handler::send_error_response_rgw_formatter(const char* error_code = empty_error, + const char* error_message = empty_error, + const char* resource_id = empty_error) { set_req_state_err(s, 0); dump_errno(s, 400); @@ -254,6 +260,7 @@ void aws_response_handler::send_error_response_rgw_formatter(const char* error_c void aws_response_handler::send_progress_response() { + m_fp_chunk_encoding(); std::string progress_payload = fmt::format("{}{}{}" , get_processed_size(), get_processed_size(), get_total_bytes_returned()); sql_result.append(progress_payload); @@ -264,6 +271,7 @@ void aws_response_handler::send_progress_response() void aws_response_handler::send_stats_response() { + m_fp_chunk_encoding(); std::string stats_payload = fmt::format("{}{}{}" , get_processed_size(), get_processed_size(), get_total_bytes_returned()); sql_result.append(stats_payload); @@ -304,12 +312,10 @@ RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3(): return 0; }; fp_s3select_result_format = [this](std::string& result) { - fp_chunked_transfer_encoding(); m_aws_response_handler.send_success_response(); return 0; }; fp_s3select_continue = [this](std::string& result) { - fp_chunked_transfer_encoding(); m_aws_response_handler.send_continuation_response(); return 0; }; @@ -330,6 +336,7 @@ RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3(): } chunk_number++; }; + } RGWSelectObj_ObjStore_S3::~RGWSelectObj_ObjStore_S3() @@ -429,7 +436,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_csv(const char* query, const char* if (s3select_syntax.get_error_description().empty() == false) { //error-flow (syntax-error) - m_aws_response_handler.send_error_response_rgw_formatter(s3select_syntax_error,s3select_syntax.get_error_description().c_str(),s3select_resource_id); + m_aws_response_handler.send_error_response(s3select_syntax_error,s3select_syntax.get_error_description().c_str(),s3select_resource_id); ldpp_dout(this, 10) << "s3-select query: failed to prase the following query {" << query << "}" << dendl; ldpp_dout(this, 10) << "s3-select query: syntax-error {" << s3select_syntax.get_error_description() << "}" << dendl; return -1; @@ -446,7 +453,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_csv(const char* query, const char* if (status < 0) { //error flow(processing-time) - m_aws_response_handler.send_error_response_rgw_formatter(s3select_processTime_error,m_s3_csv_object.get_error_description().c_str(),s3select_resource_id); + m_aws_response_handler.send_error_response(s3select_processTime_error,m_s3_csv_object.get_error_description().data(),s3select_resource_id); ldpp_dout(this, 10) << "s3-select query: failed to process query; {" << m_s3_csv_object.get_error_description() << "}" << dendl; return -1; @@ -458,7 +465,6 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_csv(const char* query, const char* } ldpp_dout(this, 10) << "s3-select: complete chunk processing : chunk length = " << input_length << dendl; if (enable_progress == true) { - fp_chunked_transfer_encoding(); m_aws_response_handler.init_progress_response(); m_aws_response_handler.send_progress_response(); } @@ -491,8 +497,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_parquet(const char* query) } if (s3select_syntax.get_error_description().empty() == false) { //the SQL statement failed the syntax parser - fp_chunked_transfer_encoding(); - m_aws_response_handler.send_error_response(m_s3_parquet_object.get_error_description().c_str()); + m_aws_response_handler.send_error_response(s3select_syntax_error,m_s3_parquet_object.get_error_description().c_str(),s3select_resource_id); ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax.get_error_description() << "}" << dendl; status = -1; @@ -502,8 +507,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_parquet(const char* query) status = m_s3_parquet_object.run_s3select_on_object(m_aws_response_handler.get_sql_result()); if (status < 0) { - fp_chunked_transfer_encoding(); - m_aws_response_handler.send_error_response(m_s3_parquet_object.get_error_description().c_str()); + m_aws_response_handler.send_error_response(s3select_processTime_error,m_s3_parquet_object.get_error_description().c_str(),s3select_resource_id); return -1; } @@ -516,7 +520,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char { int status = 0; - m_s3_csv_object.set_external_system_functions(fp_s3select_continue, + m_s3_json_object.set_external_system_functions(fp_s3select_continue, fp_s3select_result_format, fp_result_header_format, fp_debug_mesg); @@ -537,7 +541,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char s3select_syntax.parse_query(m_sql_query.c_str()); if (s3select_syntax.get_error_description().empty() == false) { //SQL statement is wrong(syntax). - m_aws_response_handler.send_error_response_rgw_formatter(s3select_syntax_error, + m_aws_response_handler.send_error_response(s3select_syntax_error, s3select_syntax.get_error_description().c_str(), s3select_resource_id); ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax.get_error_description() << "}" << dendl; @@ -559,7 +563,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char } catch(base_s3select_exception& e) { ldpp_dout(this, 10) << "S3select: failed to process JSON object: " << e.what() << dendl; m_aws_response_handler.get_sql_result().append(e.what()); - m_aws_response_handler.send_error_response_rgw_formatter(s3select_processTime_error, + m_aws_response_handler.send_error_response(s3select_processTime_error, e.what(), s3select_resource_id); return -EINVAL; @@ -568,13 +572,12 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char m_aws_response_handler.update_total_bytes_returned(length_post_processing - length_before_processing); if (status < 0) { //error flow(processing-time) - m_aws_response_handler.send_error_response_rgw_formatter(s3select_processTime_error, + m_aws_response_handler.send_error_response(s3select_processTime_error, m_s3_json_object.get_error_description().c_str(), s3select_resource_id); ldpp_dout(this, 10) << "s3-select query: failed to process query; {" << m_s3_json_object.get_error_description() << "}" << dendl; return -EINVAL; } - fp_chunked_transfer_encoding(); if (length_post_processing-length_before_processing != 0) { m_aws_response_handler.send_success_response(); @@ -726,6 +729,21 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y) #ifdef _ARROW_EXIST m_rgw_api.m_y = &y; #endif + + if (!m_aws_response_handler.is_set()) { + m_aws_response_handler.set(s, this, fp_chunked_transfer_encoding); + } + + if(s->cct->_conf->rgw_disable_s3select == true) + { + std::string error_msg="s3select : is disabled by rgw_disable_s3select configuration parameter"; + ldpp_dout(this, 10) << error_msg << dendl; + m_aws_response_handler.send_error_response_rgw_formatter(error_msg.data()); + + op_ret = -ERR_INVALID_REQUEST; + return; + } + if (m_parquet_type) { //parquet processing range_request(0, 4, parquet_magic, y); @@ -991,6 +1009,7 @@ int RGWSelectObj_ObjStore_S3::json_processing(bufferlist& bl, off_t ofs, off_t l int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_t len) { + if (m_scan_range_ind == false){ m_object_size_for_processing = s->obj_size; } @@ -1005,7 +1024,7 @@ int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_ } } if (!m_aws_response_handler.is_set()) { - m_aws_response_handler.set(s, this); + m_aws_response_handler.set(s, this, fp_chunked_transfer_encoding); } if (len == 0 && s->obj_size != 0) { return 0; diff --git a/src/rgw/rgw_s3select_private.h b/src/rgw/rgw_s3select_private.h index f6b7b4d83d320..c40ac8837fffe 100644 --- a/src/rgw/rgw_s3select_private.h +++ b/src/rgw/rgw_s3select_private.h @@ -94,6 +94,7 @@ private: void push_header(const char* header_name, const char* header_value); int create_message(u_int32_t header_len,std::string*); + std::function m_fp_chunk_encoding; public: aws_response_handler(req_state* ps, RGWOp* rgwop) : s(ps), m_rgwop(rgwop), total_bytes_returned{0}, processed_size{0} @@ -110,10 +111,11 @@ public: return true; } - void set(req_state* ps, RGWOp* rgwop) + void set(req_state* ps, RGWOp* rgwop, std::function& fp_chunk_encoding) { s = ps; m_rgwop = rgwop; + m_fp_chunk_encoding = fp_chunk_encoding; } std::string& get_sql_result(); @@ -150,7 +152,9 @@ public: void init_stats_response(); - void send_error_response(const char* error_message); + void send_error_response(const char* error_code, + const char* error_message, + const char* resource_id); void send_success_response();