]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
upon using Trino the Trino-server issue multiple requests per single query,upon compl...
authorgalsalomon66 <gal.salomon@gmail.com>
Tue, 21 Mar 2023 07:17:40 +0000 (09:17 +0200)
committergalsalomon66 <gal.salomon@gmail.com>
Mon, 10 Apr 2023 19:19:13 +0000 (22:19 +0300)
the results are merged (by Trino).  these request splits the input into equal parts; the RGW side should be aligned with Trino expectations(for result).

Signed-off-by: galsalomon66 <gal.salomon@gmail.com>
src/rgw/rgw_s3select.cc
src/rgw/rgw_s3select_private.h

index 1a89afd13c313165c8c7124a58b2a59e910f506b..8c8a5ef0b29e2c3ba11ab05889342964d8e3b9a0 100644 (file)
@@ -164,7 +164,7 @@ void aws_response_handler::init_success_response()
   m_buff_header.clear();
   header_size = create_header_records();
   sql_result.append(m_buff_header.c_str(), header_size);
-  sql_result.append(PAYLOAD_LINE);
+  //sql_result.append(PAYLOAD_LINE); //TODO add switch
 }
 
 void aws_response_handler::send_continuation_response()
@@ -215,7 +215,7 @@ void aws_response_handler::init_error_response(const char* error_message)
 
 void aws_response_handler::send_success_response()
 {
-  sql_result.append(END_PAYLOAD_LINE);
+  //sql_result.append(END_PAYLOAD_LINE);
   int buff_len = create_message(header_size);
   s->formatter->write_bin_data(sql_result.data(), buff_len);
   rgw_flush_formatter_and_reset(s, s->formatter);
@@ -266,7 +266,8 @@ RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3():
   m_object_size_for_processing(0),
   m_parquet_type(false),
   m_json_type(false),
-  chunk_number(0)
+  chunk_number(0),
+  m_requested_range(0)
 {
   set_get_data(true);
   fp_get_obj_size = [&]() {
@@ -711,7 +712,15 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y)
        //CSV or JSON processing
        if (m_scan_range_ind) {
          //scan-range
-         range_request(m_start_scan_sz, m_end_scan_sz - m_start_scan_sz, nullptr, y);
+         //TODO add 1024 to m_end_scan_sz.  and search for first row-delimiter starting m_end_scan_sz(before adding 1024)
+         size_t scan_offset = 1024; //TODO data member
+
+         if(s->obj_size &&  ((m_end_scan_sz + scan_offset) > s->obj_size)){
+           scan_offset = s->obj_size - m_end_scan_sz;
+         }
+         m_requested_range = (m_end_scan_sz - m_start_scan_sz) + scan_offset;
+         range_request(m_start_scan_sz, (m_end_scan_sz - m_start_scan_sz) + scan_offset, nullptr, y);
+         //TODO a second range request?? csv_processing may indicate a need for a second round(what about another one?)
        } else {
          RGWGetObj::execute(y);
        }
@@ -742,6 +751,31 @@ int RGWSelectObj_ObjStore_S3::parquet_processing(bufferlist& bl, off_t ofs, off_
     }
     return 0;
 }
+#if 1
+void RGWSelectObj_ObjStore_S3::continue_to_end_of_csv_row(const char* it_cp, off_t ofs, off_t& len)
+{
+//in case it is a scan range response, and row-delimiter(\n) is not included in last row
+//in needs to fetch another range(defualt = 1024, maybe its better to that on first call)
+  if(m_scan_range_ind){
+  int sc=0;
+  size_t scan_offset = 1024; //TODO data member 
+       
+    // if it's the last chunk , reduce scan-offset and search for first "\n"
+    if((m_aws_response_handler.get_processed_size()+len)>=m_requested_range)
+    {
+      for(sc=(len - scan_offset); sc < len; sc++)
+      {
+       char* p=(char*)it_cp + ofs + sc;
+       if(!strncmp("\n",p,1))
+             break;
+      }
+      len = sc + 1;//+1 for delimiter
+      ldpp_dout(this, 10) << "len =" << len << dendl;
+    }
+  }
+  m_aws_response_handler.update_processed_size(len);
+}
+#endif 
 
 int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t len)
 {
@@ -754,38 +788,45 @@ int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t le
     }
   } else {
     auto bl_len = bl.get_num_buffers();
-    int i=0;
+    int buff_no=0;
     for(auto& it : bl.buffers()) {
-      ldpp_dout(this, 10) << "processing segment " << i << " out of " << bl_len << " off " << ofs
+      ldpp_dout(this, 10) << "processing segment " << buff_no << " out of " << bl_len << " off " << ofs
                           << " len " << len << " obj-size " << m_object_size_for_processing << dendl;
       if (it.length() == 0 || len == 0) {
-        ldpp_dout(this, 10) << "s3select:it->_len is zero. segment " << i << " out of " << bl_len
+        ldpp_dout(this, 10) << "s3select:it->_len is zero. segment " << buff_no << " out of " << bl_len
                             <<  " obj-size " << m_object_size_for_processing << dendl;
         continue;
       }
-      //NOTE: the it.length() must be used (not len)
-      m_aws_response_handler.update_processed_size(it.length());
 
       if((ofs + len) > it.length()){
        ldpp_dout(this, 10) << "offset and lenghth may cause invalid read: ofs = " << ofs << " len = " << len << " it.length() = " << it.length() << dendl;
        ofs = 0;
        len = it.length();
       }
-      status = run_s3select_on_csv(m_sql_query.c_str(), &(it)[0] +ofs, len);
-      if (status<0) {
-       return -EINVAL;
-      }
-      if (m_s3_csv_object.is_sql_limit_reached()) {
-       break;
-      }
-      i++;
+
+    //TODO : should run only for the last element of bufferlist
+    continue_to_end_of_csv_row(&(it)[0], ofs, len); 
+    status = run_s3select_on_csv(m_sql_query.c_str(), &(it)[0] + ofs, len);
+    if (status<0) {
+         return -EINVAL;
     }
-  }
-  if (m_aws_response_handler.get_processed_size() == uint64_t(m_object_size_for_processing) || m_s3_csv_object.is_sql_limit_reached()) {
+    if (m_s3_csv_object.is_sql_limit_reached()) {
+         break;
+    }
+    buff_no++;
+  }//for
+  }//else
+
+  ldpp_dout(this, 10) << "s3select : m_aws_response_handler.get_processed_size() " << m_aws_response_handler.get_processed_size() 
+  << " m_object_size_for_processing " << uint64_t(m_object_size_for_processing) << dendl;
+
+  if (m_aws_response_handler.get_processed_size() >= uint64_t(m_object_size_for_processing) || m_s3_csv_object.is_sql_limit_reached()) {
     if (status >=0) {
       m_aws_response_handler.init_stats_response();
       m_aws_response_handler.send_stats_response();
       m_aws_response_handler.init_end_response();
+      ldpp_dout(this, 10) << "s3select : reached the end of query request : aws_response_handler.get_processed_size() " << m_aws_response_handler.get_processed_size()
+      << "m_object_size_for_processing : " << m_object_size_for_processing << dendl;
     }
     if (m_s3_csv_object.is_sql_limit_reached()) {
     //stop fetching chunks
@@ -793,6 +834,7 @@ int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t le
     status = -ENOENT;
     }
   }
+
   return status;
 }
 
index 7ac9167caee4fa0f75c9236164f2329788f637bd..fdf0a2c07b55532a28c318bddde33257a760c56f 100644 (file)
@@ -213,6 +213,7 @@ private:
 
 public:
   unsigned int chunk_number;
+  size_t m_requested_range;
 
   RGWSelectObj_ObjStore_S3();
   virtual ~RGWSelectObj_ObjStore_S3();
@@ -249,5 +250,6 @@ private:
   std::function<int(int64_t, int64_t, void*, optional_yield*)> fp_range_req;
   std::function<size_t(void)> fp_get_obj_size;
 
+  void continue_to_end_of_csv_row(const char*, off_t ofs, off_t& len);
 };