]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
fixing the main routine for shaping the chunk (range-scan) for Trino processing
authorgalsalomon66 <gal.salomon@gmail.com>
Tue, 4 Apr 2023 17:00:23 +0000 (20:00 +0300)
committergalsalomon66 <gal.salomon@gmail.com>
Mon, 10 Apr 2023 19:19:13 +0000 (22:19 +0300)
Signed-off-by: galsalomon66 <gal.salomon@gmail.com>
src/rgw/rgw_s3select.cc
src/rgw/rgw_s3select_private.h

index 8c8a5ef0b29e2c3ba11ab05889342964d8e3b9a0..e0cb55e1a1b93f3f0c117702989645341abf6a97 100644 (file)
@@ -164,7 +164,9 @@ void aws_response_handler::init_success_response()
   m_buff_header.clear();
   header_size = create_header_records();
   sql_result.append(m_buff_header.c_str(), header_size);
-  //sql_result.append(PAYLOAD_LINE); //TODO add switch
+#ifdef PAYLOAD_TAG
+  sql_result.append(PAYLOAD_LINE);
+#endif
 }
 
 void aws_response_handler::send_continuation_response()
@@ -215,7 +217,9 @@ void aws_response_handler::init_error_response(const char* error_message)
 
 void aws_response_handler::send_success_response()
 {
-  //sql_result.append(END_PAYLOAD_LINE);
+#ifdef PAYLOAD_TAG
+  sql_result.append(END_PAYLOAD_LINE);
+#endif
   int buff_len = create_message(header_size);
   s->formatter->write_bin_data(sql_result.data(), buff_len);
   rgw_flush_formatter_and_reset(s, s->formatter);
@@ -267,7 +271,8 @@ RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3():
   m_parquet_type(false),
   m_json_type(false),
   chunk_number(0),
-  m_requested_range(0)
+  m_requested_range(0),
+  m_scan_offset(1024)
 {
   set_get_data(true);
   fp_get_obj_size = [&]() {
@@ -711,16 +716,12 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y)
     } else { 
        //CSV or JSON processing
        if (m_scan_range_ind) {
-         //scan-range
-         //TODO add 1024 to m_end_scan_sz.  and search for first row-delimiter starting m_end_scan_sz(before adding 1024)
-         size_t scan_offset = 1024; //TODO data member
-
-         if(s->obj_size &&  ((m_end_scan_sz + scan_offset) > s->obj_size)){
-           scan_offset = s->obj_size - m_end_scan_sz;
-         }
-         m_requested_range = (m_end_scan_sz - m_start_scan_sz) + scan_offset;
-         range_request(m_start_scan_sz, (m_end_scan_sz - m_start_scan_sz) + scan_offset, nullptr, y);
-         //TODO a second range request?? csv_processing may indicate a need for a second round(what about another one?)
+
+         m_requested_range = (m_end_scan_sz - m_start_scan_sz);
+         // fetch more than requested(m_scan_offset), that additional bytes are scanned for end of row, 
+         // thus the additional length will be processed, and no broken row for Trino.
+         // assumption: row is smaller than m_scan_offset. (a different approach is to request for additional range)
+         range_request(m_start_scan_sz, (m_end_scan_sz - m_start_scan_sz) + m_scan_offset, nullptr, y);
        } else {
          RGWGetObj::execute(y);
        }
@@ -751,31 +752,55 @@ int RGWSelectObj_ObjStore_S3::parquet_processing(bufferlist& bl, off_t ofs, off_
     }
     return 0;
 }
-#if 1
-void RGWSelectObj_ObjStore_S3::continue_to_end_of_csv_row(const char* it_cp, off_t ofs, off_t& len)
-{
-//in case it is a scan range response, and row-delimiter(\n) is not included in last row
-//in needs to fetch another range(defualt = 1024, maybe its better to that on first call)
-  if(m_scan_range_ind){
-  int sc=0;
-  size_t scan_offset = 1024; //TODO data member 
-       
-    // if it's the last chunk , reduce scan-offset and search for first "\n"
-    if((m_aws_response_handler.get_processed_size()+len)>=m_requested_range)
-    {
-      for(sc=(len - scan_offset); sc < len; sc++)
+
+void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp, off_t& ofs, off_t& len)
+{
+//in case it is a scan range response, and row might be broken, this routine search for the end-of-row.
+  off_t new_offset = 0;
+  if(m_scan_range_ind){//only upon range-scan
+  uint64_t sc=0;
+  uint64_t start =0;
+  const char* row_delimiter = m_row_delimiter.c_str();
+   
+    //chop the head of the first chunk.
+    if(m_start_scan_sz && (m_aws_response_handler.get_processed_size()==0)){
+      char* p = (char*)it_cp;
+      while(strncmp(row_delimiter,p,1) && (p - it_cp) < len)p++;
+      if(!strncmp(row_delimiter,p,1)){
+       new_offset += (p - it_cp)+1;
+      } 
+    }
+
+    //chop the end of chunk    
+    //if it's the last chunk, reduce scan-offset and search for first row-delimiter 
+    if((m_aws_response_handler.get_processed_size()+len) >= m_requested_range){ 
+    //had pass the requested range, start to search for first delimiter
+      if(m_aws_response_handler.get_processed_size()){
+       start = m_aws_response_handler.get_processed_size() - m_requested_range;
+      } else {
+       start = m_requested_range;
+      }
+
+      ldout(s->cct, 10) << "S3select: start scan from " << start << " len = " << len << dendl;
+      for(sc=start;sc<len;sc++)//assumption : row-delimiter must exist or its end ebject
       {
-       char* p=(char*)it_cp + ofs + sc;
-       if(!strncmp("\n",p,1))
+       char* p = (char*)it_cp + ofs + sc;
+       if(!strncmp(row_delimiter,p,1)){
+             ldout(s->cct, 10) << "S3select: found row-delimiter on " << sc << " get_processed_size = " << m_aws_response_handler.get_processed_size() <<  dendl;
+             len = sc + 1;//+1 is for delimiter.  TODO what about m_object_size_for_processing (to update according to len)
              break;
+       }
       }
-      len = sc + 1;//+1 for delimiter
       ldpp_dout(this, 10) << "len =" << len << dendl;
     }
+
+    ofs += new_offset;
   }
-  m_aws_response_handler.update_processed_size(len);
+
+  ldout(s->cct, 10) << "S3select: shape_chunk_per_trino_requests:update progress len = " << len << dendl;
+  m_aws_response_handler.update_processed_size(len);//TODO : to run analysis to validate len is aligned with m_processed_bytes
+  len -= new_offset;
 }
-#endif 
 
 int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t len)
 {
@@ -804,8 +829,7 @@ int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t le
        len = it.length();
       }
 
-    //TODO : should run only for the last element of bufferlist
-    continue_to_end_of_csv_row(&(it)[0], ofs, len); 
+    shape_chunk_per_trino_requests(&(it)[0], ofs, len); 
     status = run_s3select_on_csv(m_sql_query.c_str(), &(it)[0] + ofs, len);
     if (status<0) {
          return -EINVAL;
@@ -909,7 +933,11 @@ int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_
       if (m_end_scan_sz == -1){
                m_end_scan_sz = s->obj_size;
       }
-      m_object_size_for_processing = m_end_scan_sz - m_start_scan_sz;
+      if ((m_end_scan_sz - m_start_scan_sz)>s->obj_size){ //in the case user provides range bigger than object-size
+       m_object_size_for_processing = s->obj_size;
+      } else {
+       m_object_size_for_processing = m_end_scan_sz - m_start_scan_sz;
+      }
   }
   if (!m_aws_response_handler.is_set()) {
     m_aws_response_handler.set(s, this);
index fdf0a2c07b55532a28c318bddde33257a760c56f..c694c770a1097ace39eba5df11634369bf72f97c 100644 (file)
@@ -214,6 +214,7 @@ private:
 public:
   unsigned int chunk_number;
   size_t m_requested_range;
+  size_t m_scan_offset;
 
   RGWSelectObj_ObjStore_S3();
   virtual ~RGWSelectObj_ObjStore_S3();
@@ -250,6 +251,6 @@ private:
   std::function<int(int64_t, int64_t, void*, optional_yield*)> fp_range_req;
   std::function<size_t(void)> fp_get_obj_size;
 
-  void continue_to_end_of_csv_row(const char*, off_t ofs, off_t& len);
+  void shape_chunk_per_trino_requests(const char*, off_t& ofs, off_t& len);
 };