]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
refactor of s3select response handler, adding continuation-response to parquet flow... 57231/head
authorGal Salomon <gal.salomon@gmail.com>
Tue, 19 Mar 2024 11:45:58 +0000 (13:45 +0200)
committerGal Salomon <gal.salomon@gmail.com>
Thu, 2 May 2024 10:59:09 +0000 (13:59 +0300)
bug fix (init_success_response).
s3select submodule

fix for json-error-flow

Signed-off-by: Gal Salomon <gal.salomon@gmail.com>
(cherry picked from commit 06eadd66db4eb0a8688ee39ae13093a68f0868b5)

src/rgw/rgw_s3select.cc
src/rgw/rgw_s3select_private.h
src/s3select

index 2033044e34bc7fcfffaa17eaf8a831dbb80eb5f4..8c6dc542512b868c96472d577f327680c1aad124 100644 (file)
@@ -44,13 +44,13 @@ void aws_response_handler::push_header(const char* header_name, const char* head
   char x;
   short s;
   x = char(strlen(header_name));
-  m_buff_header.append(&x, sizeof(x));
-  m_buff_header.append(header_name);
+  get_buffer()->append(&x, sizeof(x));
+  get_buffer()->append(header_name);
   x = char(7);
-  m_buff_header.append(&x, sizeof(x));
+  get_buffer()->append(&x, sizeof(x));
   s = htons(uint16_t(strlen(header_value)));
-  m_buff_header.append(reinterpret_cast<char*>(&s), sizeof(s));
-  m_buff_header.append(header_value);
+  get_buffer()->append(reinterpret_cast<char*>(&s), sizeof(s));
+  get_buffer()->append(header_value);
 }
 
 #define IDX( x ) static_cast<int>( x )
@@ -65,7 +65,7 @@ int aws_response_handler::create_header_records()
   push_header(header_name_str[IDX(header_name_En::CONTENT_TYPE)], header_value_str[IDX(header_value_En::OCTET_STREAM)]);
   //3
   push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
-  return m_buff_header.size();
+  return get_buffer()->size();
 }
 
 int aws_response_handler::create_header_continuation()
@@ -75,7 +75,7 @@ int aws_response_handler::create_header_continuation()
   push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::CONT)]);
   //2
   push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
-  return m_buff_header.size();
+  return get_buffer()->size();
 }
 
 int aws_response_handler::create_header_progress()
@@ -87,7 +87,7 @@ int aws_response_handler::create_header_progress()
   push_header(header_name_str[IDX(header_name_En::CONTENT_TYPE)], header_value_str[IDX(header_value_En::XML)]);
   //3
   push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
-  return m_buff_header.size();
+  return get_buffer()->size();
 }
 
 int aws_response_handler::create_header_stats()
@@ -99,7 +99,7 @@ int aws_response_handler::create_header_stats()
   push_header(header_name_str[IDX(header_name_En::CONTENT_TYPE)], header_value_str[IDX(header_value_En::XML)]);
   //3
   push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
-  return m_buff_header.size();
+  return get_buffer()->size();
 }
 
 int aws_response_handler::create_header_end()
@@ -109,7 +109,7 @@ int aws_response_handler::create_header_end()
   push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::END)]);
   //2
   push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
-  return m_buff_header.size();
+  return get_buffer()->size();
 }
 
 int aws_response_handler::create_error_header_records(const char* error_message)
@@ -122,10 +122,10 @@ int aws_response_handler::create_error_header_records(const char* error_message)
   push_header(header_name_str[IDX(header_name_En::ERROR_MESSAGE)], error_message);
   //3
   push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::ERROR_TYPE)]);
-  return m_buff_header.size();
+  return get_buffer()->size();
 }
 
-int aws_response_handler::create_message(u_int32_t header_len)
+int aws_response_handler::create_message(u_int32_t header_len,std::string *msg_string = nullptr)
 {
   //message description(AWS):
   //[total-byte-length:4][header-byte-length:4][crc:4][headers:variable-length][payload:variable-length][crc:4]
@@ -133,24 +133,27 @@ int aws_response_handler::create_message(u_int32_t header_len)
   //are created later to the produced SQL result, and actually wrapping the payload.
   auto push_encode_int = [&](u_int32_t s, int pos) {
     u_int32_t x = htonl(s);
-    sql_result.replace(pos, sizeof(x), reinterpret_cast<char*>(&x), sizeof(x));
+    msg_string->replace(pos, sizeof(x), reinterpret_cast<char*>(&x), sizeof(x));
   };
+  
+  msg_string = (msg_string == nullptr) ? &sql_result : msg_string;
+
   u_int32_t total_byte_len = 0;
   u_int32_t preload_crc = 0;
   u_int32_t message_crc = 0;
-  total_byte_len = sql_result.size() + 4; //the total is greater in 4 bytes than current size
+  total_byte_len = msg_string->size() + 4; //the total is greater in 4 bytes than current size
   push_encode_int(total_byte_len, 0);
   push_encode_int(header_len, 4);
   crc32.reset();
-  crc32 = std::for_each(sql_result.data(), sql_result.data() + 8, crc32); //crc for starting 8 bytes
+  crc32 = std::for_each(msg_string->data(), msg_string->data() + 8, crc32); //crc for starting 8 bytes
   preload_crc = crc32();
   push_encode_int(preload_crc, 8);
   crc32.reset();
-  crc32 = std::for_each(sql_result.begin(), sql_result.end(), crc32); //crc for payload + checksum
+  crc32 = std::for_each(msg_string->begin(), msg_string->end(), crc32); //crc for payload + checksum
   message_crc = crc32();
   u_int32_t x = htonl(message_crc);
-  sql_result.append(reinterpret_cast<char*>(&x), sizeof(x));
-  return sql_result.size();
+  msg_string->append(reinterpret_cast<char*>(&x), sizeof(x));
+  return msg_string->size();
 }
 
 void aws_response_handler::init_response()
@@ -161,58 +164,63 @@ void aws_response_handler::init_response()
 
 void aws_response_handler::init_success_response()
 {
-  m_buff_header.clear();
-  header_size = create_header_records();
-  sql_result.append(m_buff_header.c_str(), header_size);
-#ifdef PAYLOAD_TAG
-  sql_result.append(PAYLOAD_LINE);
-#endif
+  get_buffer()->clear();
+  m_success_header_size = create_header_records();
+  sql_result.append(get_buffer()->c_str(), m_success_header_size);
 }
 
 void aws_response_handler::send_continuation_response()
 {
-  sql_result.resize(header_crc_size, '\0');
-  m_buff_header.clear();
+  set_continue_buffer();
+  continue_result.resize(header_crc_size, '\0');
+  get_buffer()->clear();
   header_size = create_header_continuation();
-  sql_result.append(m_buff_header.c_str(), header_size);
-  int buff_len = create_message(header_size);
-  s->formatter->write_bin_data(sql_result.data(), buff_len);
+  continue_result.append(get_buffer()->c_str(), header_size);
+  int buff_len = create_message(header_size,&continue_result);
+  s->formatter->write_bin_data(continue_result.data(), buff_len);
   rgw_flush_formatter_and_reset(s, s->formatter);
+  get_buffer()->clear();
+  set_main_buffer();
 }
 
 void aws_response_handler::init_progress_response()
 {
   sql_result.resize(header_crc_size, '\0');
-  m_buff_header.clear();
+  get_buffer()->clear();
   header_size = create_header_progress();
-  sql_result.append(m_buff_header.c_str(), header_size);
+  sql_result.append(get_buffer()->c_str(), header_size);
 }
 
 void aws_response_handler::init_stats_response()
 {
   sql_result.resize(header_crc_size, '\0');
-  m_buff_header.clear();
+  get_buffer()->clear();
   header_size = create_header_stats();
-  sql_result.append(m_buff_header.c_str(), header_size);
+  sql_result.append(get_buffer()->c_str(), header_size);
 }
 
 void aws_response_handler::init_end_response()
 {
   sql_result.resize(header_crc_size, '\0');
-  m_buff_header.clear();
+  get_buffer()->clear();
   header_size = create_header_end();
-  sql_result.append(m_buff_header.c_str(), header_size);
+  sql_result.append(get_buffer()->c_str(), header_size);
   int buff_len = create_message(header_size);
   s->formatter->write_bin_data(sql_result.data(), buff_len);
   rgw_flush_formatter_and_reset(s, s->formatter);
 }
 
-void aws_response_handler::init_error_response(const char* error_message)
+void aws_response_handler::send_error_response(const char* error_message)
 {
-  //currently not in use. the headers in the case of error, are not extracted by AWS-cli.
-  m_buff_header.clear();
+  //currently not in use. need to change the s3-test, this error-response raises a boto3 exception
+  error_result.resize(header_crc_size, '\0');
+  get_buffer()->clear();
   header_size = create_error_header_records(error_message);
-  sql_result.append(m_buff_header.c_str(), header_size);
+  error_result.append(get_buffer()->c_str(), header_size);
+
+  int buff_len = create_message(header_size,&error_result);
+  s->formatter->write_bin_data(error_result.data(), buff_len);
+  rgw_flush_formatter_and_reset(s, s->formatter);
 }
 
 void aws_response_handler::send_success_response()
@@ -220,12 +228,12 @@ void aws_response_handler::send_success_response()
 #ifdef PAYLOAD_TAG
   sql_result.append(END_PAYLOAD_LINE);
 #endif
-  int buff_len = create_message(header_size);
+  int buff_len = create_message(m_success_header_size);
   s->formatter->write_bin_data(sql_result.data(), buff_len);
   rgw_flush_formatter_and_reset(s, s->formatter);
 }
 
-void aws_response_handler::send_error_response(const char* error_code,
+void aws_response_handler::send_error_response_rgw_formatter(const char* error_code,
     const char* error_message,
     const char* resource_id)
 {
@@ -263,7 +271,6 @@ void aws_response_handler::send_stats_response()
 }
 
 RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3():
-  m_buff_header(std::make_unique<char[]>(1000)),
   m_scan_range_ind(false),
   m_start_scan_sz(0),
   m_end_scan_sz(0),
@@ -299,6 +306,11 @@ RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3():
     m_aws_response_handler.send_success_response();
     return 0;
   };
+  fp_s3select_continue = [this](std::string& result) {
+    fp_chunked_transfer_encoding();
+    m_aws_response_handler.send_continuation_response();
+    return 0;
+  };
 
   fp_debug_mesg = [&](const char* mesg){
     ldpp_dout(this, 10) << mesg << dendl;
@@ -369,9 +381,6 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_csv(const char* query, const char*
   int status = 0;
   uint32_t length_before_processing, length_post_processing;
   csv_object::csv_defintions csv;
-  const char* s3select_syntax_error = "s3select-Syntax-Error";
-  const char* s3select_resource_id = "resource-id";
-  const char* s3select_processTime_error = "s3select-ProcessingTime-Error";
 
   s3select_syntax.parse_query(query);
   if (m_row_delimiter.size()) {
@@ -408,14 +417,17 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_csv(const char* query, const char*
   } else if(m_header_info.compare("USE")==0) {
     csv.use_header_info=true;
   }
-  m_s3_csv_object.set_external_debug_system(fp_debug_mesg);
-  m_s3_csv_object.set_result_formatters(fp_s3select_result_format,fp_result_header_format);
+
   m_s3_csv_object.set_csv_query(&s3select_syntax, csv);
+
+  m_s3_csv_object.set_external_system_functions(fp_s3select_continue,
+                                               fp_s3select_result_format,
+                                               fp_result_header_format,
+                                               fp_debug_mesg);
+
   if (s3select_syntax.get_error_description().empty() == false) {
     //error-flow (syntax-error)
-    m_aws_response_handler.send_error_response(s3select_syntax_error,
-        s3select_syntax.get_error_description().c_str(),
-        s3select_resource_id);
+    m_aws_response_handler.send_error_response_rgw_formatter(s3select_syntax_error,s3select_syntax.get_error_description().c_str(),s3select_resource_id);
     ldpp_dout(this, 10) << "s3-select query: failed to prase the following query {" << query << "}" << dendl;
     ldpp_dout(this, 10) << "s3-select query: syntax-error {" << s3select_syntax.get_error_description() << "}" << dendl;
     return -1;
@@ -432,9 +444,8 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_csv(const char* query, const char*
 
     if (status < 0) {
       //error flow(processing-time)
-      m_aws_response_handler.send_error_response(s3select_processTime_error,
-          m_s3_csv_object.get_error_description().c_str(),
-          s3select_resource_id);
+      m_aws_response_handler.send_error_response_rgw_formatter(s3select_processTime_error,m_s3_csv_object.get_error_description().c_str(),s3select_resource_id);
+      
       ldpp_dout(this, 10) << "s3-select query: failed to process query; {" << m_s3_csv_object.get_error_description() << "}" << dendl;
       return -1;
     }
@@ -442,8 +453,6 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_csv(const char* query, const char*
   }
   if ((length_post_processing-length_before_processing) != 0) {
     ldpp_dout(this, 10) << "s3-select: sql-result-size = " << m_aws_response_handler.get_sql_result().size() << dendl;
-  } else {
-    m_aws_response_handler.send_continuation_response();
   }
   ldpp_dout(this, 10) << "s3-select: complete chunk processing : chunk length = " << input_length << dendl;
   if (enable_progress == true) {
@@ -461,7 +470,12 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_parquet(const char* query)
   if (!m_s3_parquet_object.is_set()) {
     //parsing the SQL statement.
     s3select_syntax.parse_query(m_sql_query.c_str());
-    //m_s3_parquet_object.set_external_debug_system(fp_debug_mesg);
+
+  m_s3_parquet_object.set_external_system_functions(fp_s3select_continue,
+                                               fp_s3select_result_format,
+                                               fp_result_header_format,
+                                               fp_debug_mesg);
+
     try {
       //at this stage the Parquet-processing requires for the meta-data that reside on Parquet object 
       m_s3_parquet_object.set_parquet_object(std::string("s3object"), &s3select_syntax, &m_rgw_api);
@@ -475,19 +489,21 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_parquet(const char* query)
   }
   if (s3select_syntax.get_error_description().empty() == false) {
     //the SQL statement failed the syntax parser
-    fp_result_header_format(m_aws_response_handler.get_sql_result());
-    m_aws_response_handler.get_sql_result().append(s3select_syntax.get_error_description().data());
-    fp_s3select_result_format(m_aws_response_handler.get_sql_result());
+    fp_chunked_transfer_encoding();
+    m_aws_response_handler.send_error_response(m_s3_parquet_object.get_error_description().c_str());
+
     ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax.get_error_description() << "}" << dendl;
     status = -1;
   } else {
     fp_result_header_format(m_aws_response_handler.get_sql_result());
     //at this stage the Parquet-processing "takes control", it keep calling to s3-range-request according to the SQL statement.
-    status = m_s3_parquet_object.run_s3select_on_object(m_aws_response_handler.get_sql_result(), fp_s3select_result_format, fp_result_header_format);
+    status = m_s3_parquet_object.run_s3select_on_object(m_aws_response_handler.get_sql_result());
     if (status < 0) {
-      m_aws_response_handler.get_sql_result().append(m_s3_parquet_object.get_error_description());
-      fp_s3select_result_format(m_aws_response_handler.get_sql_result());
-      ldout(s->cct, 10) << "S3select: failure while execution" << m_s3_parquet_object.get_error_description() << dendl;
+
+      fp_chunked_transfer_encoding();
+      m_aws_response_handler.send_error_response(m_s3_parquet_object.get_error_description().c_str());
+
+      return -1;
     }
   }
 #endif
@@ -498,17 +514,17 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char
 {
   int status = 0;
   
-  const char* s3select_processTime_error = "s3select-ProcessingTime-Error";
-  const char* s3select_syntax_error = "s3select-Syntax-Error";
-  const char* s3select_resource_id = "resourcse-id";
-  const char* s3select_json_error = "json-Format-Error";
+  m_s3_csv_object.set_external_system_functions(fp_s3select_continue,
+                                               fp_s3select_result_format,
+                                               fp_result_header_format,
+                                               fp_debug_mesg);
 
   m_aws_response_handler.init_response();
 
   //the JSON data-type should be(currently) only DOCUMENT
   if (m_json_datatype.compare("DOCUMENT") != 0) {
     const char* s3select_json_error_msg = "s3-select query: wrong json dataType should use DOCUMENT; ";
-    m_aws_response_handler.send_error_response(s3select_json_error,
+    m_aws_response_handler.send_error_response_rgw_formatter(s3select_json_error,
       s3select_json_error_msg,
       s3select_resource_id);
     ldpp_dout(this, 10) << s3select_json_error_msg << dendl;
@@ -519,7 +535,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char
   s3select_syntax.parse_query(m_sql_query.c_str());
   if (s3select_syntax.get_error_description().empty() == false) {
   //SQL statement is wrong(syntax).
-    m_aws_response_handler.send_error_response(s3select_syntax_error,
+    m_aws_response_handler.send_error_response_rgw_formatter(s3select_syntax_error,
       s3select_syntax.get_error_description().c_str(),
       s3select_resource_id);
     ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax.get_error_description() << "}" << dendl;
@@ -541,7 +557,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char
   } catch(base_s3select_exception& e) {
     ldpp_dout(this, 10) << "S3select: failed to process JSON object: " << e.what() << dendl;
     m_aws_response_handler.get_sql_result().append(e.what());
-    m_aws_response_handler.send_error_response(s3select_processTime_error,
+    m_aws_response_handler.send_error_response_rgw_formatter(s3select_processTime_error,
        e.what(),
        s3select_resource_id);
     return -EINVAL;
@@ -550,7 +566,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char
   m_aws_response_handler.update_total_bytes_returned(length_post_processing - length_before_processing);
   if (status < 0) {
     //error flow(processing-time)
-    m_aws_response_handler.send_error_response(s3select_processTime_error,
+    m_aws_response_handler.send_error_response_rgw_formatter(s3select_processTime_error,
        m_s3_json_object.get_error_description().c_str(),
        s3select_resource_id);
     ldpp_dout(this, 10) << "s3-select query: failed to process query; {" << m_s3_json_object.get_error_description() << "}" << dendl;
@@ -560,8 +576,6 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char
 
   if (length_post_processing-length_before_processing != 0) {
     m_aws_response_handler.send_success_response();
-  } else {
-    m_aws_response_handler.send_continuation_response();
   }
   if (enable_progress == true) {
     m_aws_response_handler.init_progress_response();
index f4fd59a29ac8e9732b4c5c5c0aceacbcf910ac22..f6b7b4d83d32050334a9290f6b342ec7a5a2c226 100644 (file)
@@ -47,15 +47,22 @@ class aws_response_handler
 {
 
 private:
-  std::string sql_result;
+  std::string sql_result;//SQL result buffer
+  std::string continue_result;//CONT-MESG buffer
+  std::string error_result;//SQL error buffer
   req_state* s;
   uint32_t header_size;
   // the parameters are according to CRC-32 algorithm and its aligned with AWS-cli checksum
   boost::crc_optimal<32, 0x04C11DB7, 0xFFFFFFFF, 0xFFFFFFFF, true, true> crc32;
   RGWOp* m_rgwop;
-  std::string m_buff_header;
+  std::string m_buff_header_;//response buffer
+  std::string m_buff_continue;//response buffer
+  //m_buff_ptr : a switch between m_buff_header_ and m_buff_continue
+  std::string* m_buff_ptr=nullptr;
   uint64_t total_bytes_returned;
   uint64_t processed_size;
+  uint32_t m_success_header_size;
+
 
   enum class header_name_En {
     EVENT_TYPE,
@@ -86,7 +93,7 @@ private:
 
   void push_header(const char* header_name, const char* header_value);
 
-  int create_message(u_int32_t header_len);
+  int create_message(u_int32_t header_len,std::string*);
 
 public:
   aws_response_handler(req_state* ps, RGWOp* rgwop) : s(ps), m_rgwop(rgwop), total_bytes_returned{0}, processed_size{0}
@@ -143,7 +150,7 @@ public:
 
   void init_stats_response();
 
-  void init_error_response(const char* error_message);
+  void send_error_response(const char* error_message);
 
   void send_success_response();
 
@@ -151,10 +158,26 @@ public:
 
   void send_stats_response();
 
-  void send_error_response(const char* error_code,
+  void send_error_response_rgw_formatter(const char* error_code,
                            const char* error_message,
                            const char* resource_id);
 
+  std::string* get_buffer()
+  {
+    if(!m_buff_ptr) set_main_buffer();
+    return m_buff_ptr;
+  }
+
+  void set_continue_buffer()
+  {
+    m_buff_ptr = &m_buff_continue;
+  }
+
+  void set_main_buffer()
+  {
+    m_buff_ptr = &m_buff_header_;
+  }
+
 }; //end class aws_response_handler
 
 class RGWSelectObj_ObjStore_S3 : public RGWGetObj_ObjStore_S3
@@ -175,7 +198,6 @@ private:
   std::string m_row_delimiter;
   std::string m_compression_type;
   std::string m_escape_char;
-  std::unique_ptr<char[]>  m_buff_header;
   std::string m_header_info;
   std::string m_sql_query;
   std::string m_enable_progress;
@@ -207,10 +229,16 @@ private:
   std::string range_req_str;
   std::function<int(std::string&)> fp_result_header_format;
   std::function<int(std::string&)> fp_s3select_result_format;
+  std::function<int(std::string&)> fp_s3select_continue;
   std::function<void(const char*)> fp_debug_mesg;
   std::function<void(void)> fp_chunked_transfer_encoding;
   int m_header_size;
 
+  const char* s3select_processTime_error = "s3select-ProcessingTime-Error";
+  const char* s3select_syntax_error = "s3select-Syntax-Error";
+  const char* s3select_resource_id = "resourcse-id";
+  const char* s3select_json_error = "json-Format-Error";
+
 public:
   unsigned int chunk_number;
   size_t m_requested_range;
index 232ac7061ec3ab52fb4a76264a11e3c23af2972d..eb40d36e1090a8e02b610f67b7edb902d59f2bbe 160000 (submodule)
@@ -1 +1 @@
-Subproject commit 232ac7061ec3ab52fb4a76264a11e3c23af2972d
+Subproject commit eb40d36e1090a8e02b610f67b7edb902d59f2bbe