]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
s3select submodule.
authorGal Salomon <gal.salomon@gmail.com>
Mon, 2 Sep 2024 17:19:54 +0000 (20:19 +0300)
committerGal Salomon <gal.salomon@gmail.com>
Mon, 30 Sep 2024 13:06:39 +0000 (16:06 +0300)
RGW option per parquet read-buffer.
identation.
alignment with s3select updated APIs.

Signed-off-by: Gal Salomon <gal.salomon@gmail.com>
src/common/options/rgw.yaml.in
src/rgw/rgw_s3select.cc
src/rgw/rgw_s3select_private.h
src/s3select

index 09d37dfcd826135182e27366e9251a168f798d96..c47eafe89f91bf535beeb88e8fae401e6a2337a0 100644 (file)
@@ -59,6 +59,14 @@ options:
   services:
   - rgw
   with_legacy: true
+- name: rgw_parquet_buffer_size
+  type: size
+  level: advanced
+  desc: the Maximum parquet buffer size, a limit to memory consumption for parquet reading operations.
+  default: 1_G
+  services:
+  - rgw
+  with_legacy: true
 - name: rgw_rados_tracing
   type: bool
   level: advanced
index 800d276a6aab1c75c4ade9ef8d73e21dbd771d2f..79e6c53167ec964dfc20a059b86e6fbf9b36fcd7 100644 (file)
@@ -287,6 +287,7 @@ RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3():
   m_object_size_for_processing(0),
   m_parquet_type(false),
   m_json_type(false),
+  m_outputFormat(OutputFormat::CSV),
   chunk_number(0),
   m_requested_range(0),
   m_scan_offset(1024),
@@ -344,7 +345,7 @@ RGWSelectObj_ObjStore_S3::~RGWSelectObj_ObjStore_S3()
 
 int RGWSelectObj_ObjStore_S3::get_params(optional_yield y)
 {
-  if(m_s3select_query.empty() == false) {
+  if (m_s3select_query.empty() == false) {
     return 0;
   }
 #ifndef _ARROW_EXIST
@@ -416,17 +417,19 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_csv(const char* query, const char*
   if (output_escape_char.size()) {
     csv.output_escape_char = *output_escape_char.c_str();
   }
-  if(output_quote_fields.compare("ALWAYS") == 0) {
+  if (output_quote_fields.compare("ALWAYS") == 0) {
     csv.quote_fields_always = true;
-  } else if(output_quote_fields.compare("ASNEEDED") == 0) {
+  } else if (output_quote_fields.compare("ASNEEDED") == 0) {
     csv.quote_fields_asneeded = true;
   }
-  if(m_header_info.compare("IGNORE")==0) {
+  if (m_header_info.compare("IGNORE")==0) {
     csv.ignore_header_info=true;
-  } else if(m_header_info.compare("USE")==0) {
+  } else if (m_header_info.compare("USE")==0) {
     csv.use_header_info=true;
   }
-
+  if (m_outputFormat == OutputFormat::JSON) {
+    csv.output_json_format = true;
+  }
   m_s3_csv_object.set_csv_query(&s3select_syntax, csv);
 
   m_s3_csv_object.set_external_system_functions(fp_s3select_continue,
@@ -478,6 +481,10 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_parquet(const char* query)
   if (!m_s3_parquet_object.is_set()) {
     //parsing the SQL statement.
     s3select_syntax.parse_query(m_sql_query.c_str());
+    parquet_object::csv_definitions parquet;
+    if (m_outputFormat == OutputFormat::JSON) {
+    parquet.output_json_format = true;
+    }
 
   m_s3_parquet_object.set_external_system_functions(fp_s3select_continue,
                                                fp_s3select_result_format,
@@ -486,7 +493,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_parquet(const char* query)
 
     try {
       //at this stage the Parquet-processing requires for the meta-data that reside on Parquet object 
-      m_s3_parquet_object.set_parquet_object(std::string("s3object"), &s3select_syntax, &m_rgw_api);
+      m_s3_parquet_object.set_parquet_object(std::string("s3object"), &s3select_syntax, &m_rgw_api, parquet);
     } catch(base_s3select_exception& e) {
       ldpp_dout(this, 10) << "S3select: failed upon parquet-reader construction: " << e.what() << dendl;
       fp_result_header_format(m_aws_response_handler.get_sql_result());
@@ -524,6 +531,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char
                                                fp_s3select_result_format,
                                                fp_result_header_format,
                                                fp_debug_mesg);
+  json_object::csv_definitions json;
 
   m_aws_response_handler.init_response();
 
@@ -536,6 +544,10 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char
     return -EINVAL;
   } 
 
+  if (m_outputFormat == OutputFormat::JSON) {
+    json.output_json_format = true;
+  }
+
   //parsing the SQL statement
   s3select_syntax.parse_query(m_sql_query.c_str());
   if (s3select_syntax.get_error_description().empty() == false) {
@@ -547,8 +559,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char
   }
     
   //initializing json processor
-  json_object::csv_definitions output_definition;
-  m_s3_json_object.set_json_query(&s3select_syntax,output_definition);
+  m_s3_json_object.set_json_query(&s3select_syntax, json);
 
   if (input == nullptr) {
     input = "";
@@ -618,6 +629,10 @@ int RGWSelectObj_ObjStore_S3::handle_aws_cli_parameters(std::string& sql_query)
     ldpp_dout(this, 10) << "s3select: engine is set to process Parquet objects" << dendl;
   }
 
+  if (m_s3select_query.find(output_tag+"><JSON") != std::string::npos) {
+    m_outputFormat = OutputFormat::JSON;
+  }
+
   extract_by_tag(m_s3select_query, "Expression", sql_query);
   extract_by_tag(m_s3select_query, "Enabled", m_enable_progress);
   size_t _qi = m_s3select_query.find("<" + input_tag + ">", 0);
@@ -706,6 +721,7 @@ int RGWSelectObj_ObjStore_S3::range_request(int64_t ofs, int64_t len, void* buff
   RGWGetObj::parse_range();
   requested_buffer.clear();
   m_request_range = len;
+  m_aws_response_handler.update_processed_size(len);
   ldout(s->cct, 10) << "S3select: calling execute(async):" << " request-offset :" << ofs << " request-length :" << len << " buffer size : " << requested_buffer.size() << dendl;
   RGWGetObj::execute(y);
   if (buff) {
@@ -730,7 +746,7 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y)
     m_aws_response_handler.set(s, this, fp_chunked_transfer_encoding);
   }
 
-  if(s->cct->_conf->rgw_disable_s3select == true)
+  if (s->cct->_conf->rgw_disable_s3select == true)
   {
       std::string error_msg="s3select : is disabled by rgw_disable_s3select configuration parameter";
       ldpp_dout(this, 10) << error_msg << dendl;
@@ -754,7 +770,16 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y)
       ldout(s->cct, 10) << "S3select: failed to process query <" << m_sql_query << "> on object " << s->object->get_name() << dendl;
       op_ret = -ERR_INVALID_REQUEST;
     } else {
-      ldout(s->cct, 10) << "S3select: complete query with success " << dendl;
+      {//status per amount of processed data
+       //TODO add number of calls to range_request
+      m_aws_response_handler.init_stats_response();
+      m_aws_response_handler.send_stats_response();
+      m_aws_response_handler.init_end_response();
+      ldpp_dout(this, 10) << "s3select : reached the end of parquet query request : aws_response_handler.get_processed_size() " 
+      << m_aws_response_handler.get_processed_size()
+      << "m_object_size_for_processing : " << m_object_size_for_processing << dendl;
+      } 
+      ldout(s->cct, 10) << "S3select: complete parquet query with success " << dendl;
     }
     } else { 
        //CSV or JSON processing
@@ -762,7 +787,7 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y)
 
          m_requested_range = (m_end_scan_sz - m_start_scan_sz);
            
-         if(m_is_trino_request){
+         if (m_is_trino_request){
          // fetch more than requested(m_scan_offset), that additional bytes are scanned for end of row, 
          // thus the additional length will be processed, and no broken row for Trino.
          // assumption: row is smaller than m_scan_offset. (a different approach is to request for additional range)
@@ -782,6 +807,7 @@ int RGWSelectObj_ObjStore_S3::parquet_processing(bufferlist& bl, off_t ofs, off_
     fp_chunked_transfer_encoding();
     size_t append_in_callback = 0;
     int part_no = 1;
+    parquet::ceph::S3select_Config::getInstance().set_s3select_reader_properties(s->cct->_conf->rgw_parquet_buffer_size);
     //concat the requested buffer
     for (auto& it : bl.buffers()) {
       if (it.length() == 0) {
@@ -789,6 +815,11 @@ int RGWSelectObj_ObjStore_S3::parquet_processing(bufferlist& bl, off_t ofs, off_
       }
       append_in_callback += it.length();
       ldout(s->cct, 10) << "S3select: part " << part_no++ << " it.length() = " << it.length() << dendl;
+      if ((ofs + len) > it.length()){
+       ldpp_dout(this, 10) << "s3select: offset and length may cause invalid read: ofs = " << ofs << " len = " << len << " it.length() = " << it.length() << dendl;
+       ofs = 0;
+       len = it.length();
+      }
       requested_buffer.append(&(it)[0]+ofs, len);
     }
     ldout(s->cct, 10) << "S3select:append_in_callback = " << append_in_callback << dendl;
@@ -809,7 +840,7 @@ void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp,
 //the purpose is to return "perfect" results, with no broken or missing lines.
 
   off_t new_offset = 0;
-  if(m_scan_range_ind){//only upon range-scan
+  if (m_scan_range_ind){//only upon range-scan
   int64_t sc=0;
   int64_t start =0;
   const char* row_delimiter = m_row_delimiter.c_str();
@@ -817,10 +848,10 @@ void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp,
     ldpp_dout(this, 10) << "s3select query: per Trino request the first and last chunk should modified." << dendl;
 
     //chop the head of the first chunk and only upon the slice does not include the head of the object.
-    if(m_start_scan_sz && (m_aws_response_handler.get_processed_size()==0)){
+    if (m_start_scan_sz && (m_aws_response_handler.get_processed_size()==0)){
       char* p = const_cast<char*>(it_cp+ofs);
       while(strncmp(row_delimiter,p,1) && (p - (it_cp+ofs)) < len)p++;
-      if(!strncmp(row_delimiter,p,1)){
+      if (!strncmp(row_delimiter,p,1)){
        new_offset += (p - (it_cp+ofs))+1;
       } 
     }
@@ -831,14 +862,14 @@ void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp,
 
     //chop the end of the last chunk for this request
     //if it's the last chunk, search for first row-delimiter for the following different use-cases
-    if((m_aws_response_handler.get_processed_size()+len) >= m_requested_range){ 
+    if ((m_aws_response_handler.get_processed_size()+len) >= m_requested_range){ 
     //had pass the requested range, start to search for first delimiter
-      if(m_aws_response_handler.get_processed_size()>m_requested_range){
+      if (m_aws_response_handler.get_processed_size()>m_requested_range){
        //the previous chunk contain the complete request(all data) and an extra bytes.
        //thus, search for the first row-delimiter
        //[:previous (RR) ... ][:current (RD) ]
        start = 0;
-      } else if(m_aws_response_handler.get_processed_size()){
+      } else if (m_aws_response_handler.get_processed_size()){
        //the *current* chunk contain the complete request in the middle of the chunk. 
        //thus, search for the first row-delimiter after the complete request position
        //[:current (RR) .... (RD) ]
@@ -852,7 +883,7 @@ void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp,
       for(sc=start;sc<len;sc++)//assumption : row-delimiter must exist or its end ebject
       {
        char* p = const_cast<char*>(it_cp) + ofs + sc;
-       if(!strncmp(row_delimiter,p,1)){
+       if (!strncmp(row_delimiter,p,1)){
              ldout(s->cct, 10) << "S3select: found row-delimiter on " << sc << " get_processed_size = " << m_aws_response_handler.get_processed_size() <<  dendl;
              len = sc + 1;//+1 is for delimiter.  TODO what about m_object_size_for_processing (to update according to len)
              //the end of row exist in current chunk.
@@ -872,7 +903,7 @@ void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp,
 int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t len)
 {
   int status = 0;
-  if(m_skip_next_chunk == true){
+  if (m_skip_next_chunk == true){
     return status;
   } 
 
@@ -894,13 +925,13 @@ int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t le
       }
 
 
-      if(ofs > it.length()){
+      if (ofs > it.length()){
       //safety check
        ldpp_dout(this, 10) << "offset and length may cause invalid read: ofs = " << ofs << " len = " << len << " it.length() = " << it.length() << dendl;
        ofs = 0;
       }
 
-    if(m_is_trino_request){
+    if (m_is_trino_request){
       //TODO replace len with it.length() ? ; test Trino flow with compressed objects.
       //is it possible to send get-by-ranges? in parallel?
       shape_chunk_per_trino_requests(&(it)[0], ofs, len); 
@@ -964,7 +995,7 @@ int RGWSelectObj_ObjStore_S3::json_processing(bufferlist& bl, off_t ofs, off_t l
         continue;
       }
 
-      if((ofs + len) > it.length()){
+      if ((ofs + len) > it.length()){
        ldpp_dout(this, 10) << "s3select: offset and length may cause invalid read: ofs = " << ofs << " len = " << len << " it.length() = " << it.length() << dendl;
        ofs = 0;
        len = it.length();
index 7beac4f4a5d881e60d6f93c5f7634875db6dfd5c..87d180acebc0f32f844c88d72b6ab952e6787d64 100644 (file)
@@ -241,6 +241,11 @@ private:
   const char* s3select_json_error = "InvalidJsonType";
 
 public:
+  enum class OutputFormat {
+            CSV,
+            JSON
+        };
+  OutputFormat m_outputFormat;
   unsigned int chunk_number;
   size_t m_requested_range;
   size_t m_scan_offset;
index 9b9f35743ef2a1828e7a0ec26ffb02711e7e3d7d..a8cafe89608169bc8cc229f9ed14e022e43149b8 160000 (submodule)
@@ -1 +1 @@
-Subproject commit 9b9f35743ef2a1828e7a0ec26ffb02711e7e3d7d
+Subproject commit a8cafe89608169bc8cc229f9ed14e022e43149b8