]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
handling more use cases for "shaping" the processed chunk by s3select per Trino request json_s3tests_and_more
authorgalsalomon66 <gal.salomon@gmail.com>
Sat, 15 Apr 2023 09:18:39 +0000 (12:18 +0300)
committergalsalomon66 <gal.salomon@gmail.com>
Sat, 15 Apr 2023 09:18:39 +0000 (12:18 +0300)
Signed-off-by: galsalomon66 <gal.salomon@gmail.com>
src/rgw/rgw_s3select.cc
src/rgw/rgw_s3select_private.h

index e0cb55e1a1b93f3f0c117702989645341abf6a97..4a4e72ac9815eae25f362efa6e68cdff83fb8254 100644 (file)
@@ -272,7 +272,8 @@ RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3():
   m_json_type(false),
   chunk_number(0),
   m_requested_range(0),
-  m_scan_offset(1024)
+  m_scan_offset(1024),
+  m_skip_next_chunk(false)
 {
   set_get_data(true);
   fp_get_obj_size = [&]() {
@@ -755,45 +756,57 @@ int RGWSelectObj_ObjStore_S3::parquet_processing(bufferlist& bl, off_t ofs, off_
 
 void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp, off_t& ofs, off_t& len)
 {
-//in case it is a scan range response, and row might be broken, this routine search for the end-of-row.
+//in case it is a scan range request and sent by Trino client.
+//this routine chops the start/end of chunks.
+//the purpose is to return "perfect" results, with no broken or missing lines.
+
   off_t new_offset = 0;
   if(m_scan_range_ind){//only upon range-scan
-  uint64_t sc=0;
-  uint64_t start =0;
+  int64_t sc=0;
+  int64_t start =0;
   const char* row_delimiter = m_row_delimiter.c_str();
    
     //chop the head of the first chunk.
     if(m_start_scan_sz && (m_aws_response_handler.get_processed_size()==0)){
-      char* p = (char*)it_cp;
+      char* p = const_cast<char*>(it_cp);
       while(strncmp(row_delimiter,p,1) && (p - it_cp) < len)p++;
       if(!strncmp(row_delimiter,p,1)){
        new_offset += (p - it_cp)+1;
       } 
     }
 
-    //chop the end of chunk    
-    //if it's the last chunk, reduce scan-offset and search for first row-delimiter 
+    //chop the end of the last chunk for this request
+    //if it's the last chunk, search for first row-delimiter for the following different use-cases
     if((m_aws_response_handler.get_processed_size()+len) >= m_requested_range){ 
     //had pass the requested range, start to search for first delimiter
-      if(m_aws_response_handler.get_processed_size()){
-       start = m_aws_response_handler.get_processed_size() - m_requested_range;
+      if(m_aws_response_handler.get_processed_size()>m_requested_range){
+       //the previous chunk contain the complete request(all data) and an extra bytes.
+       //thus, search for the first row-delimiter
+       start = 0;
+      } else if(m_aws_response_handler.get_processed_size()){
+       //the *current* chunk contain the complete request in the middle of the chunk. 
+       //thus, search for the first row-delimiter after the complete request position
+       start = m_requested_range -  m_aws_response_handler.get_processed_size();
       } else {
+       //the current chunk is the first chunk and it contains complete request
        start = m_requested_range;
       }
 
       ldout(s->cct, 10) << "S3select: start scan from " << start << " len = " << len << dendl;
       for(sc=start;sc<len;sc++)//assumption : row-delimiter must exist or its end ebject
       {
-       char* p = (char*)it_cp + ofs + sc;
+       char* p = const_cast<char*>(it_cp) + ofs + sc;
        if(!strncmp(row_delimiter,p,1)){
              ldout(s->cct, 10) << "S3select: found row-delimiter on " << sc << " get_processed_size = " << m_aws_response_handler.get_processed_size() <<  dendl;
              len = sc + 1;//+1 is for delimiter.  TODO what about m_object_size_for_processing (to update according to len)
+             //the end of row exist in current chunk.
+             //thus, the next chunk should be skipped
+             m_skip_next_chunk = true; 
              break;
        }
       }
       ldpp_dout(this, 10) << "len =" << len << dendl;
     }
-
     ofs += new_offset;
   }
 
@@ -805,7 +818,11 @@ void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp,
 int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t len)
 {
   int status = 0;
+
+  if(m_skip_next_chunk == true){
+    return status;
+  } 
+
   if (s->obj_size == 0 || m_object_size_for_processing == 0) {
     status = run_s3select_on_csv(m_sql_query.c_str(), nullptr, 0);
     if (status<0){
@@ -933,7 +950,7 @@ int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_
       if (m_end_scan_sz == -1){
                m_end_scan_sz = s->obj_size;
       }
-      if ((m_end_scan_sz - m_start_scan_sz)>s->obj_size){ //in the case user provides range bigger than object-size
+      if (static_cast<uint64_t>((m_end_scan_sz - m_start_scan_sz))>s->obj_size){ //in the case user provides range bigger than object-size
        m_object_size_for_processing = s->obj_size;
       } else {
        m_object_size_for_processing = m_end_scan_sz - m_start_scan_sz;
index c694c770a1097ace39eba5df11634369bf72f97c..548c87182f6577a491723976e2d0c71e4e39fa74 100644 (file)
@@ -215,6 +215,7 @@ public:
   unsigned int chunk_number;
   size_t m_requested_range;
   size_t m_scan_offset;
+  bool m_skip_next_chunk;
 
   RGWSelectObj_ObjStore_S3();
   virtual ~RGWSelectObj_ObjStore_S3();