]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
adding s3test albin/json-op-serial
authorgalsalomon66 <gal.salomon@gmail.com>
Fri, 10 Mar 2023 12:27:05 +0000 (14:27 +0200)
committergalsalomon66 <gal.salomon@gmail.com>
Tue, 9 May 2023 12:56:31 +0000 (15:56 +0300)
modify json chunk processing function to handle offset/length as csv-processing
a fix valgrind :: Conditional jump or move depends on uninitialised value
upon using Trino the Trino-server issue multiple requests per single query,upon completion of all requests
the results are merged (by Trino).  these request splits the input into equal parts; the RGW side should be aligned with Trino expectations(for result).
fixing the main routine for shaping the chunk (range-scan) for Trino processing
upon removing the payload-TAG, it need to change the response element index
handling more use cases for "shaping" the processed chunk by s3select per Trino request
re-shape the processed chunk only upon Trino sent the request
bug-fix: the chunk offset was not handle correctly
bug-fix: progress-message calcualation
modifying the range-request boundaries only upon Trino request.

Signed-off-by: galsalomon66 <gal.salomon@gmail.com>
qa/rgw/s3tests-branch.yaml
src/rgw/rgw_s3select.cc
src/rgw/rgw_s3select_private.h

index ef6819c87e06a4c1df50c8021ab245d27ca25225..7ebb214716eeff91afd4a536e686af22c53cfe36 100644 (file)
@@ -1,4 +1,4 @@
 overrides:
   s3tests:
-    force-branch: ceph-master
-    # git_remote: https://github.com/ceph/
+    force-branch: json_s3tests
+    git_remote: https://github.com/galsalomon66/
index 3a11ddf839c6c989b71a4fe0cbdd551fbbfbd9ae..7d93569f3a4cf1fa2223354d14f64cd2e83b8751 100644 (file)
@@ -164,7 +164,9 @@ void aws_response_handler::init_success_response()
   m_buff_header.clear();
   header_size = create_header_records();
   sql_result.append(m_buff_header.c_str(), header_size);
+#ifdef PAYLOAD_TAG
   sql_result.append(PAYLOAD_LINE);
+#endif
 }
 
 void aws_response_handler::send_continuation_response()
@@ -215,7 +217,9 @@ void aws_response_handler::init_error_response(const char* error_message)
 
 void aws_response_handler::send_success_response()
 {
+#ifdef PAYLOAD_TAG
   sql_result.append(END_PAYLOAD_LINE);
+#endif
   int buff_len = create_message(header_size);
   s->formatter->write_bin_data(sql_result.data(), buff_len);
   rgw_flush_formatter_and_reset(s, s->formatter);
@@ -266,7 +270,11 @@ RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3():
   m_object_size_for_processing(0),
   m_parquet_type(false),
   m_json_type(false),
-  chunk_number(0)
+  chunk_number(0),
+  m_requested_range(0),
+  m_scan_offset(1024),
+  m_skip_next_chunk(false),
+  m_is_trino_request(false)
 {
   set_get_data(true);
   fp_get_obj_size = [&]() {
@@ -339,6 +347,16 @@ int RGWSelectObj_ObjStore_S3::get_params(optional_yield y)
     ldpp_dout(this, 10) << "s3-select query: failed to retrieve query;" << dendl;
     return -1;
   }
+  const auto& m = s->info.env->get_map();
+  auto user_agent = m.find("HTTP_USER_AGENT"); {
+  if (user_agent != m.end()){
+    if (user_agent->second.find("Trino") != std::string::npos){
+       m_is_trino_request = true;
+       ldpp_dout(this, 10) << "s3-select query: request sent by Trino." << dendl;
+      }
+    }
+  }
+
   int status = handle_aws_cli_parameters(m_sql_query);
   if (status<0) {
     return status;
@@ -368,11 +386,6 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_csv(const char* query, const char*
   if (m_escape_char.size()) {
     csv.escape_char = *m_escape_char.c_str();
   }
-  if (m_enable_progress.compare("true")==0) {
-    enable_progress = true;
-  } else {
-    enable_progress = false;
-  }
   if (output_row_delimiter.size()) {
     csv.output_row_delimiter = *output_row_delimiter.c_str();
   }
@@ -429,7 +442,6 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_csv(const char* query, const char*
   }
   if ((length_post_processing-length_before_processing) != 0) {
     ldpp_dout(this, 10) << "s3-select: sql-result-size = " << m_aws_response_handler.get_sql_result().size() << dendl;
-    ldpp_dout(this, 10) << "s3-select: sql-result{" << m_aws_response_handler.get_sql_result() << "}" << dendl;
   } else {
     m_aws_response_handler.send_continuation_response();
   }
@@ -638,6 +650,11 @@ int RGWSelectObj_ObjStore_S3::handle_aws_cli_parameters(std::string& sql_query)
       m_end_scan_sz = std::numeric_limits<std::int64_t>::max();
     } 
   }
+  if (m_enable_progress.compare("true")==0) {
+    enable_progress = true;
+  } else {
+    enable_progress = false;
+  }
   return 0;
 }
 
@@ -710,8 +727,18 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y)
     } else { 
        //CSV or JSON processing
        if (m_scan_range_ind) {
-         //scan-range
-         range_request(m_start_scan_sz, m_end_scan_sz - m_start_scan_sz, nullptr, y);
+
+         m_requested_range = (m_end_scan_sz - m_start_scan_sz);
+           
+         if(m_is_trino_request){
+         // fetch more than requested(m_scan_offset), that additional bytes are scanned for end of row, 
+         // thus the additional length will be processed, and no broken row for Trino.
+         // assumption: row is smaller than m_scan_offset. (a different approach is to request for additional range)
+           range_request(m_start_scan_sz, m_requested_range + m_scan_offset, nullptr, y);
+         } else {
+           range_request(m_start_scan_sz, m_requested_range, nullptr, y);
+         }
+
        } else {
          RGWGetObj::execute(y);
        }
@@ -743,10 +770,80 @@ int RGWSelectObj_ObjStore_S3::parquet_processing(bufferlist& bl, off_t ofs, off_
     return 0;
 }
 
+void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp, off_t& ofs, off_t& len)
+{
+//in case it is a scan range request and sent by Trino client.
+//this routine chops the start/end of chunks.
+//the purpose is to return "perfect" results, with no broken or missing lines.
+
+  off_t new_offset = 0;
+  if(m_scan_range_ind){//only upon range-scan
+  int64_t sc=0;
+  int64_t start =0;
+  const char* row_delimiter = m_row_delimiter.c_str();
+   
+    ldpp_dout(this, 10) << "s3select query: per Trino request the first and last chunk should modified." << dendl;
+
+    //chop the head of the first chunk and only upon the slice does not include the head of the object.
+    if(m_start_scan_sz && (m_aws_response_handler.get_processed_size()==0)){
+      char* p = const_cast<char*>(it_cp+ofs);
+      while(strncmp(row_delimiter,p,1) && (p - (it_cp+ofs)) < len)p++;
+      if(!strncmp(row_delimiter,p,1)){
+       new_offset += (p - (it_cp+ofs))+1;
+      } 
+    }
+
+    //RR : end of the range-request. the original request sent by Trino client
+    //RD : row-delimiter
+    //[ ... ] : chunk boundaries 
+
+    //chop the end of the last chunk for this request
+    //if it's the last chunk, search for first row-delimiter for the following different use-cases
+    if((m_aws_response_handler.get_processed_size()+len) >= m_requested_range){ 
+    //had pass the requested range, start to search for first delimiter
+      if(m_aws_response_handler.get_processed_size()>m_requested_range){
+       //the previous chunk contain the complete request(all data) and an extra bytes.
+       //thus, search for the first row-delimiter
+       //[:previous (RR) ... ][:current (RD) ]
+       start = 0;
+      } else if(m_aws_response_handler.get_processed_size()){
+       //the *current* chunk contain the complete request in the middle of the chunk. 
+       //thus, search for the first row-delimiter after the complete request position
+       //[:current (RR) .... (RD) ]
+       start = m_requested_range -  m_aws_response_handler.get_processed_size();
+      } else {
+       //the current chunk is the first chunk and it contains complete request
+       //[:current:first-chunk (RR) .... (RD) ]
+       start = m_requested_range;
+      }
+
+      for(sc=start;sc<len;sc++)//assumption : row-delimiter must exist or its end ebject
+      {
+       char* p = const_cast<char*>(it_cp) + ofs + sc;
+       if(!strncmp(row_delimiter,p,1)){
+             ldout(s->cct, 10) << "S3select: found row-delimiter on " << sc << " get_processed_size = " << m_aws_response_handler.get_processed_size() <<  dendl;
+             len = sc + 1;//+1 is for delimiter.  TODO what about m_object_size_for_processing (to update according to len)
+             //the end of row exist in current chunk.
+             //thus, the next chunk should be skipped
+             m_skip_next_chunk = true; 
+             break;
+       }
+      }
+    }
+    ofs += new_offset;
+  }
+
+  ldout(s->cct, 10) << "S3select: shape_chunk_per_trino_requests:update progress len = " << len << dendl;
+  len -= new_offset;
+}
+
 int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t len)
 {
   int status = 0;
+  if(m_skip_next_chunk == true){
+    return status;
+  } 
+
   if (s->obj_size == 0 || m_object_size_for_processing == 0) {
     status = run_s3select_on_csv(m_sql_query.c_str(), nullptr, 0);
     if (status<0){
@@ -754,38 +851,50 @@ int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t le
     }
   } else {
     auto bl_len = bl.get_num_buffers();
-    int i=0;
+    int buff_no=0;
     for(auto& it : bl.buffers()) {
-      ldpp_dout(this, 10) << "processing segment " << i << " out of " << bl_len << " off " << ofs
+      ldpp_dout(this, 10) << "s3select :processing segment " << buff_no << " out of " << bl_len << " off " << ofs
                           << " len " << len << " obj-size " << m_object_size_for_processing << dendl;
       if (it.length() == 0 || len == 0) {
-        ldpp_dout(this, 10) << "s3select:it->_len is zero. segment " << i << " out of " << bl_len
+        ldpp_dout(this, 10) << "s3select :it->_len is zero. segment " << buff_no << " out of " << bl_len
                             <<  " obj-size " << m_object_size_for_processing << dendl;
         continue;
       }
-      //NOTE: the it.length() must be used (not len)
-      m_aws_response_handler.update_processed_size(it.length());
 
       if((ofs + len) > it.length()){
-       ldpp_dout(this, 10) << "offset and lenghth may cause invalid read: ofs = " << ofs << " len = " << len << " it.length() = " << it.length() << dendl;
+       ldpp_dout(this, 10) << "offset and length may cause invalid read: ofs = " << ofs << " len = " << len << " it.length() = " << it.length() << dendl;
        ofs = 0;
        len = it.length();
       }
-      status = run_s3select_on_csv(m_sql_query.c_str(), &(it)[0] +ofs, len);
-      if (status<0) {
-       return -EINVAL;
-      }
-      if (m_s3_csv_object.is_sql_limit_reached()) {
-       break;
-      }
-      i++;
+
+    if(m_is_trino_request){
+      shape_chunk_per_trino_requests(&(it)[0], ofs, len); 
     }
-  }
-  if (m_aws_response_handler.get_processed_size() == uint64_t(m_object_size_for_processing) || m_s3_csv_object.is_sql_limit_reached()) {
+
+    ldpp_dout(this, 10) << "s3select: chunk:  ofs = " << ofs << " len = " << len << " it.length() = " << it.length() << " m_object_size_for_processing = " << m_object_size_for_processing << dendl;
+    
+    m_aws_response_handler.update_processed_size(it.length());//NOTE : to run analysis to validate len is aligned with m_processed_bytes
+    status = run_s3select_on_csv(m_sql_query.c_str(), &(it)[0] + ofs, len);
+    if (status<0) {
+         return -EINVAL;
+    }
+    if (m_s3_csv_object.is_sql_limit_reached()) {
+         break;
+    }
+    buff_no++;
+  }//for
+  }//else
+
+  ldpp_dout(this, 10) << "s3select : m_aws_response_handler.get_processed_size() " << m_aws_response_handler.get_processed_size() 
+  << " m_object_size_for_processing " << uint64_t(m_object_size_for_processing) << dendl;
+
+  if (m_aws_response_handler.get_processed_size() >= uint64_t(m_object_size_for_processing) || m_s3_csv_object.is_sql_limit_reached()) {
     if (status >=0) {
       m_aws_response_handler.init_stats_response();
       m_aws_response_handler.send_stats_response();
       m_aws_response_handler.init_end_response();
+      ldpp_dout(this, 10) << "s3select : reached the end of query request : aws_response_handler.get_processed_size() " << m_aws_response_handler.get_processed_size()
+      << "m_object_size_for_processing : " << m_object_size_for_processing << dendl;
     }
     if (m_s3_csv_object.is_sql_limit_reached()) {
     //stop fetching chunks
@@ -793,6 +902,7 @@ int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t le
     status = -ENOENT;
     }
   }
+
   return status;
 }
 
@@ -818,8 +928,15 @@ int RGWSelectObj_ObjStore_S3::json_processing(bufferlist& bl, off_t ofs, off_t l
                             <<  " obj-size " << m_object_size_for_processing << dendl;
         continue;
       }
+
+      if((ofs + len) > it.length()){
+       ldpp_dout(this, 10) << "s3select: offset and length may cause invalid read: ofs = " << ofs << " len = " << len << " it.length() = " << it.length() << dendl;
+       ofs = 0;
+       len = it.length();
+      }
+
       m_aws_response_handler.update_processed_size(len);
-      status = run_s3select_on_json(m_sql_query.c_str(), &(it)[0], len);
+      status = run_s3select_on_json(m_sql_query.c_str(), &(it)[0] + ofs, len);
       if (status<0) {
        status = -EINVAL;
         break;
@@ -828,8 +945,8 @@ int RGWSelectObj_ObjStore_S3::json_processing(bufferlist& bl, off_t ofs, off_t l
        break;
       }
       i++;
-    }
-  }
+    }//for
+  }//else
 
   if (status>=0 && (m_aws_response_handler.get_processed_size() == uint64_t(m_object_size_for_processing) || m_s3_json_object.is_sql_limit_reached())) {
     //flush the internal JSON buffer(upon last chunk)
@@ -860,7 +977,11 @@ int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_
       if (m_end_scan_sz == -1){
                m_end_scan_sz = s->obj_size;
       }
-      m_object_size_for_processing = m_end_scan_sz - m_start_scan_sz;
+      if (static_cast<uint64_t>((m_end_scan_sz - m_start_scan_sz))>s->obj_size){ //in the case user provides range bigger than object-size
+       m_object_size_for_processing = s->obj_size;
+      } else {
+       m_object_size_for_processing = m_end_scan_sz - m_start_scan_sz;
+      }
   }
   if (!m_aws_response_handler.is_set()) {
     m_aws_response_handler.set(s, this);
index 7ac9167caee4fa0f75c9236164f2329788f637bd..fa595b0da599fa2dc8a78f17e98e652638b389b3 100644 (file)
@@ -213,6 +213,10 @@ private:
 
 public:
   unsigned int chunk_number;
+  size_t m_requested_range;
+  size_t m_scan_offset;
+  bool m_skip_next_chunk;
+  bool m_is_trino_request;
 
   RGWSelectObj_ObjStore_S3();
   virtual ~RGWSelectObj_ObjStore_S3();
@@ -249,5 +253,6 @@ private:
   std::function<int(int64_t, int64_t, void*, optional_yield*)> fp_range_req;
   std::function<size_t(void)> fp_get_obj_size;
 
+  void shape_chunk_per_trino_requests(const char*, off_t& ofs, off_t& len);
 };