From 9ae5219aab347152e3bd8133c0abb56743367c9c Mon Sep 17 00:00:00 2001 From: galsalomon66 Date: Sat, 15 Apr 2023 12:18:39 +0300 Subject: [PATCH] handling more use cases for "shaping" the processed chunk by s3select per Trino request Signed-off-by: galsalomon66 --- src/rgw/rgw_s3select.cc | 43 ++++++++++++++++++++++++---------- src/rgw/rgw_s3select_private.h | 1 + 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/src/rgw/rgw_s3select.cc b/src/rgw/rgw_s3select.cc index e0cb55e1a1b93..4a4e72ac9815e 100644 --- a/src/rgw/rgw_s3select.cc +++ b/src/rgw/rgw_s3select.cc @@ -272,7 +272,8 @@ RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3(): m_json_type(false), chunk_number(0), m_requested_range(0), - m_scan_offset(1024) + m_scan_offset(1024), + m_skip_next_chunk(false) { set_get_data(true); fp_get_obj_size = [&]() { @@ -755,45 +756,57 @@ int RGWSelectObj_ObjStore_S3::parquet_processing(bufferlist& bl, off_t ofs, off_ void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp, off_t& ofs, off_t& len) { -//in case it is a scan range response, and row might be broken, this routine search for the end-of-row. +//in case it is a scan range request and sent by Trino client. +//this routine chops the start/end of chunks. +//the purpose is to return "perfect" results, with no broken or missing lines. + off_t new_offset = 0; if(m_scan_range_ind){//only upon range-scan - uint64_t sc=0; - uint64_t start =0; + int64_t sc=0; + int64_t start =0; const char* row_delimiter = m_row_delimiter.c_str(); //chop the head of the first chunk. if(m_start_scan_sz && (m_aws_response_handler.get_processed_size()==0)){ - char* p = (char*)it_cp; + char* p = const_cast(it_cp); while(strncmp(row_delimiter,p,1) && (p - it_cp) < len)p++; if(!strncmp(row_delimiter,p,1)){ new_offset += (p - it_cp)+1; } } - //chop the end of chunk - //if it's the last chunk, reduce scan-offset and search for first row-delimiter + //chop the end of the last chunk for this request + //if it's the last chunk, search for first row-delimiter for the following different use-cases if((m_aws_response_handler.get_processed_size()+len) >= m_requested_range){ //had pass the requested range, start to search for first delimiter - if(m_aws_response_handler.get_processed_size()){ - start = m_aws_response_handler.get_processed_size() - m_requested_range; + if(m_aws_response_handler.get_processed_size()>m_requested_range){ + //the previous chunk contain the complete request(all data) and an extra bytes. + //thus, search for the first row-delimiter + start = 0; + } else if(m_aws_response_handler.get_processed_size()){ + //the *current* chunk contain the complete request in the middle of the chunk. + //thus, search for the first row-delimiter after the complete request position + start = m_requested_range - m_aws_response_handler.get_processed_size(); } else { + //the current chunk is the first chunk and it contains complete request start = m_requested_range; } ldout(s->cct, 10) << "S3select: start scan from " << start << " len = " << len << dendl; for(sc=start;sc(it_cp) + ofs + sc; if(!strncmp(row_delimiter,p,1)){ ldout(s->cct, 10) << "S3select: found row-delimiter on " << sc << " get_processed_size = " << m_aws_response_handler.get_processed_size() << dendl; len = sc + 1;//+1 is for delimiter. TODO what about m_object_size_for_processing (to update according to len) + //the end of row exist in current chunk. + //thus, the next chunk should be skipped + m_skip_next_chunk = true; break; } } ldpp_dout(this, 10) << "len =" << len << dendl; } - ofs += new_offset; } @@ -805,7 +818,11 @@ void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp, int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t len) { int status = 0; - + + if(m_skip_next_chunk == true){ + return status; + } + if (s->obj_size == 0 || m_object_size_for_processing == 0) { status = run_s3select_on_csv(m_sql_query.c_str(), nullptr, 0); if (status<0){ @@ -933,7 +950,7 @@ int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_ if (m_end_scan_sz == -1){ m_end_scan_sz = s->obj_size; } - if ((m_end_scan_sz - m_start_scan_sz)>s->obj_size){ //in the case user provides range bigger than object-size + if (static_cast((m_end_scan_sz - m_start_scan_sz))>s->obj_size){ //in the case user provides range bigger than object-size m_object_size_for_processing = s->obj_size; } else { m_object_size_for_processing = m_end_scan_sz - m_start_scan_sz; diff --git a/src/rgw/rgw_s3select_private.h b/src/rgw/rgw_s3select_private.h index c694c770a1097..548c87182f657 100644 --- a/src/rgw/rgw_s3select_private.h +++ b/src/rgw/rgw_s3select_private.h @@ -215,6 +215,7 @@ public: unsigned int chunk_number; size_t m_requested_range; size_t m_scan_offset; + bool m_skip_next_chunk; RGWSelectObj_ObjStore_S3(); virtual ~RGWSelectObj_ObjStore_S3(); -- 2.39.5