]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/s3select : fix for error flow.
authorGal Salomon <gal.salomon@gmail.com>
Thu, 11 Apr 2024 16:37:10 +0000 (19:37 +0300)
committerGal Salomon <gal.salomon@gmail.com>
Fri, 25 Apr 2025 00:00:22 +0000 (03:00 +0300)
in some cases the error message does not return to client, connection got broken (invalid chunk length)
fix another broken connection
all data-source to use same API for sending error-response
add the option rgw_s3select_disable(boolean). upon turning-on this option, it rejects s3select-requests with an error-message
editorial.
rollback to ceph-master. the ceph/s3-tests#561 must be merged with ceph-PR

Signed-off-by: Gal Salomon <gal.salomon@gmail.com>
(cherry picked from commit 53ad57c9d4c18e369128f9e0f3a143f5608b56f9)

src/common/options/rgw.yaml.in
src/rgw/rgw_s3select.cc
src/rgw/rgw_s3select_private.h

index eff70c92feb316d35f0444e744bdda62df26fbd5..f1055981c53ca626b1053ba4f3007db90ac5cdbc 100644 (file)
@@ -51,6 +51,14 @@ options:
   services:
   - rgw
   with_legacy: true
+- name: rgw_disable_s3select
+  type: bool
+  level: advanced
+  desc: disable the s3select operation; RGW will report an error and will return ERR_INVALID_REQUEST.
+  default: false
+  services:
+  - rgw
+  with_legacy: true
 - name: rgw_rados_tracing
   type: bool
   level: advanced
index 1b7dced2782176987a5dc16c2bd5d29544f04566..d989147cdc7d67e14bf7ea9410376f289f62ba11 100644 (file)
@@ -173,6 +173,7 @@ void aws_response_handler::init_success_response()
 
 void aws_response_handler::send_continuation_response()
 {
+  m_fp_chunk_encoding();
   set_continue_buffer();
   continue_result.resize(header_crc_size, '\0');
   get_buffer()->clear();
@@ -203,6 +204,7 @@ void aws_response_handler::init_stats_response()
 
 void aws_response_handler::init_end_response()
 {
+  m_fp_chunk_encoding();
   sql_result.resize(header_crc_size, '\0');
   get_buffer()->clear();
   header_size = create_header_end();
@@ -212,12 +214,13 @@ void aws_response_handler::init_end_response()
   rgw_flush_formatter_and_reset(s, s->formatter);
 }
 
-void aws_response_handler::send_error_response(const char* error_message)
+void aws_response_handler::send_error_response(const char* error_code, const char* error_message, const char* resource_id)
 {
-  //currently not in use. need to change the s3-test, this error-response raises a boto3 exception
+  m_fp_chunk_encoding();
+  std::string out_error_msg = std::string(error_code) + " :" + std::string(error_message) + " :" + std::string(resource_id);
   error_result.resize(header_crc_size, '\0');
   get_buffer()->clear();
-  header_size = create_error_header_records(error_message);
+  header_size = create_error_header_records(out_error_msg.data());
   error_result.append(get_buffer()->c_str(), header_size);
 
   int buff_len = create_message(header_size,&error_result);
@@ -230,14 +233,17 @@ void aws_response_handler::send_success_response()
 #ifdef PAYLOAD_TAG
   sql_result.append(END_PAYLOAD_LINE);
 #endif
+  m_fp_chunk_encoding();
   int buff_len = create_message(m_success_header_size);
   s->formatter->write_bin_data(sql_result.data(), buff_len);
   rgw_flush_formatter_and_reset(s, s->formatter);
 }
 
-void aws_response_handler::send_error_response_rgw_formatter(const char* error_code,
-    const char* error_message,
-    const char* resource_id)
+static constexpr const char* empty_error="--";
+
+void aws_response_handler::send_error_response_rgw_formatter(const char* error_code = empty_error,
+    const char* error_message = empty_error,
+    const char* resource_id = empty_error)
 {
   set_req_state_err(s, 0);
   dump_errno(s, 400);
@@ -254,6 +260,7 @@ void aws_response_handler::send_error_response_rgw_formatter(const char* error_c
 
 void aws_response_handler::send_progress_response()
 {
+  m_fp_chunk_encoding();
   std::string progress_payload = fmt::format("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Progress><BytesScanned>{}</BytesScanned><BytesProcessed>{}</BytesProcessed><BytesReturned>{}</BytesReturned></Progress>"
                                  , get_processed_size(), get_processed_size(), get_total_bytes_returned());
   sql_result.append(progress_payload);
@@ -264,6 +271,7 @@ void aws_response_handler::send_progress_response()
 
 void aws_response_handler::send_stats_response()
 {
+  m_fp_chunk_encoding();
   std::string stats_payload = fmt::format("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Stats><BytesScanned>{}</BytesScanned><BytesProcessed>{}</BytesProcessed><BytesReturned>{}</BytesReturned></Stats>"
                                           , get_processed_size(), get_processed_size(), get_total_bytes_returned());
   sql_result.append(stats_payload);
@@ -304,12 +312,10 @@ RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3():
     return 0;
   };
   fp_s3select_result_format = [this](std::string& result) {
-    fp_chunked_transfer_encoding();
     m_aws_response_handler.send_success_response();
     return 0;
   };
   fp_s3select_continue = [this](std::string& result) {
-    fp_chunked_transfer_encoding();
     m_aws_response_handler.send_continuation_response();
     return 0;
   };
@@ -330,6 +336,7 @@ RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3():
     } 
     chunk_number++; 
   };
+
 }
 
 RGWSelectObj_ObjStore_S3::~RGWSelectObj_ObjStore_S3()
@@ -429,7 +436,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_csv(const char* query, const char*
 
   if (s3select_syntax.get_error_description().empty() == false) {
     //error-flow (syntax-error)
-    m_aws_response_handler.send_error_response_rgw_formatter(s3select_syntax_error,s3select_syntax.get_error_description().c_str(),s3select_resource_id);
+    m_aws_response_handler.send_error_response(s3select_syntax_error,s3select_syntax.get_error_description().c_str(),s3select_resource_id);
     ldpp_dout(this, 10) << "s3-select query: failed to prase the following query {" << query << "}" << dendl;
     ldpp_dout(this, 10) << "s3-select query: syntax-error {" << s3select_syntax.get_error_description() << "}" << dendl;
     return -1;
@@ -446,7 +453,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_csv(const char* query, const char*
 
     if (status < 0) {
       //error flow(processing-time)
-      m_aws_response_handler.send_error_response_rgw_formatter(s3select_processTime_error,m_s3_csv_object.get_error_description().c_str(),s3select_resource_id);
+      m_aws_response_handler.send_error_response(s3select_processTime_error,m_s3_csv_object.get_error_description().data(),s3select_resource_id);
       
       ldpp_dout(this, 10) << "s3-select query: failed to process query; {" << m_s3_csv_object.get_error_description() << "}" << dendl;
       return -1;
@@ -458,7 +465,6 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_csv(const char* query, const char*
   }
   ldpp_dout(this, 10) << "s3-select: complete chunk processing : chunk length = " << input_length << dendl;
   if (enable_progress == true) {
-    fp_chunked_transfer_encoding();
     m_aws_response_handler.init_progress_response();
     m_aws_response_handler.send_progress_response();
   }
@@ -491,8 +497,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_parquet(const char* query)
   }
   if (s3select_syntax.get_error_description().empty() == false) {
     //the SQL statement failed the syntax parser
-    fp_chunked_transfer_encoding();
-    m_aws_response_handler.send_error_response(m_s3_parquet_object.get_error_description().c_str());
+    m_aws_response_handler.send_error_response(s3select_syntax_error,m_s3_parquet_object.get_error_description().c_str(),s3select_resource_id);
 
     ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax.get_error_description() << "}" << dendl;
     status = -1;
@@ -502,8 +507,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_parquet(const char* query)
     status = m_s3_parquet_object.run_s3select_on_object(m_aws_response_handler.get_sql_result());
     if (status < 0) {
 
-      fp_chunked_transfer_encoding();
-      m_aws_response_handler.send_error_response(m_s3_parquet_object.get_error_description().c_str());
+      m_aws_response_handler.send_error_response(s3select_processTime_error,m_s3_parquet_object.get_error_description().c_str(),s3select_resource_id);
 
       return -1;
     }
@@ -516,7 +520,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char
 {
   int status = 0;
   
-  m_s3_csv_object.set_external_system_functions(fp_s3select_continue,
+  m_s3_json_object.set_external_system_functions(fp_s3select_continue,
                                                fp_s3select_result_format,
                                                fp_result_header_format,
                                                fp_debug_mesg);
@@ -537,7 +541,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char
   s3select_syntax.parse_query(m_sql_query.c_str());
   if (s3select_syntax.get_error_description().empty() == false) {
   //SQL statement is wrong(syntax).
-    m_aws_response_handler.send_error_response_rgw_formatter(s3select_syntax_error,
+    m_aws_response_handler.send_error_response(s3select_syntax_error,
       s3select_syntax.get_error_description().c_str(),
       s3select_resource_id);
     ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax.get_error_description() << "}" << dendl;
@@ -559,7 +563,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char
   } catch(base_s3select_exception& e) {
     ldpp_dout(this, 10) << "S3select: failed to process JSON object: " << e.what() << dendl;
     m_aws_response_handler.get_sql_result().append(e.what());
-    m_aws_response_handler.send_error_response_rgw_formatter(s3select_processTime_error,
+    m_aws_response_handler.send_error_response(s3select_processTime_error,
        e.what(),
        s3select_resource_id);
     return -EINVAL;
@@ -568,13 +572,12 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char
   m_aws_response_handler.update_total_bytes_returned(length_post_processing - length_before_processing);
   if (status < 0) {
     //error flow(processing-time)
-    m_aws_response_handler.send_error_response_rgw_formatter(s3select_processTime_error,
+    m_aws_response_handler.send_error_response(s3select_processTime_error,
        m_s3_json_object.get_error_description().c_str(),
        s3select_resource_id);
     ldpp_dout(this, 10) << "s3-select query: failed to process query; {" << m_s3_json_object.get_error_description() << "}" << dendl;
     return -EINVAL;
   }
-  fp_chunked_transfer_encoding();
 
   if (length_post_processing-length_before_processing != 0) {
     m_aws_response_handler.send_success_response();
@@ -726,6 +729,21 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y)
 #ifdef _ARROW_EXIST
   m_rgw_api.m_y = &y;
 #endif
+
+  if (!m_aws_response_handler.is_set()) {
+    m_aws_response_handler.set(s, this, fp_chunked_transfer_encoding);
+  }
+
+  if(s->cct->_conf->rgw_disable_s3select == true)
+  {
+      std::string error_msg="s3select : is disabled by rgw_disable_s3select configuration parameter";
+      ldpp_dout(this, 10) << error_msg << dendl;
+      m_aws_response_handler.send_error_response_rgw_formatter(error_msg.data());
+      
+      op_ret = -ERR_INVALID_REQUEST;
+      return;
+  }
+
   if (m_parquet_type) {
     //parquet processing
     range_request(0, 4, parquet_magic, y);
@@ -991,6 +1009,7 @@ int RGWSelectObj_ObjStore_S3::json_processing(bufferlist& bl, off_t ofs, off_t l
 
 int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_t len)
 {
+
   if (m_scan_range_ind == false){
     m_object_size_for_processing = s->obj_size;
   }
@@ -1005,7 +1024,7 @@ int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_
       }
   }
   if (!m_aws_response_handler.is_set()) {
-    m_aws_response_handler.set(s, this);
+    m_aws_response_handler.set(s, this, fp_chunked_transfer_encoding);
   }
   if (len == 0 && s->obj_size != 0) {
     return 0;
index f6b7b4d83d32050334a9290f6b342ec7a5a2c226..c40ac8837fffe112af0b1da5657459df999cd2fc 100644 (file)
@@ -94,6 +94,7 @@ private:
   void push_header(const char* header_name, const char* header_value);
 
   int create_message(u_int32_t header_len,std::string*);
+  std::function<void(void)> m_fp_chunk_encoding;
 
 public:
   aws_response_handler(req_state* ps, RGWOp* rgwop) : s(ps), m_rgwop(rgwop), total_bytes_returned{0}, processed_size{0}
@@ -110,10 +111,11 @@ public:
     return true;
   }
 
-  void set(req_state* ps, RGWOp* rgwop)
+  void set(req_state* ps, RGWOp* rgwop, std::function<void(void)>& fp_chunk_encoding)
   {
     s = ps;
     m_rgwop = rgwop;
+    m_fp_chunk_encoding = fp_chunk_encoding;
   }
 
   std::string& get_sql_result();
@@ -150,7 +152,9 @@ public:
 
   void init_stats_response();
 
-  void send_error_response(const char* error_message);
+  void send_error_response(const char* error_code,
+                           const char* error_message,
+                           const char* resource_id);
 
   void send_success_response();