From 77db03388a739d9a130d020d74955c0ca99e367a Mon Sep 17 00:00:00 2001 From: galsalomon66 Date: Tue, 4 Apr 2023 20:00:23 +0300 Subject: [PATCH] fixing the main routine for shaping the chunk (range-scan) for Trino processing Signed-off-by: galsalomon66 --- src/rgw/rgw_s3select.cc | 96 ++++++++++++++++++++++------------ src/rgw/rgw_s3select_private.h | 3 +- 2 files changed, 64 insertions(+), 35 deletions(-) diff --git a/src/rgw/rgw_s3select.cc b/src/rgw/rgw_s3select.cc index 8c8a5ef0b29e2..e0cb55e1a1b93 100644 --- a/src/rgw/rgw_s3select.cc +++ b/src/rgw/rgw_s3select.cc @@ -164,7 +164,9 @@ void aws_response_handler::init_success_response() m_buff_header.clear(); header_size = create_header_records(); sql_result.append(m_buff_header.c_str(), header_size); - //sql_result.append(PAYLOAD_LINE); //TODO add switch +#ifdef PAYLOAD_TAG + sql_result.append(PAYLOAD_LINE); +#endif } void aws_response_handler::send_continuation_response() @@ -215,7 +217,9 @@ void aws_response_handler::init_error_response(const char* error_message) void aws_response_handler::send_success_response() { - //sql_result.append(END_PAYLOAD_LINE); +#ifdef PAYLOAD_TAG + sql_result.append(END_PAYLOAD_LINE); +#endif int buff_len = create_message(header_size); s->formatter->write_bin_data(sql_result.data(), buff_len); rgw_flush_formatter_and_reset(s, s->formatter); @@ -267,7 +271,8 @@ RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3(): m_parquet_type(false), m_json_type(false), chunk_number(0), - m_requested_range(0) + m_requested_range(0), + m_scan_offset(1024) { set_get_data(true); fp_get_obj_size = [&]() { @@ -711,16 +716,12 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y) } else { //CSV or JSON processing if (m_scan_range_ind) { - //scan-range - //TODO add 1024 to m_end_scan_sz. and search for first row-delimiter starting m_end_scan_sz(before adding 1024) - size_t scan_offset = 1024; //TODO data member - - if(s->obj_size && ((m_end_scan_sz + scan_offset) > s->obj_size)){ - scan_offset = s->obj_size - m_end_scan_sz; - } - m_requested_range = (m_end_scan_sz - m_start_scan_sz) + scan_offset; - range_request(m_start_scan_sz, (m_end_scan_sz - m_start_scan_sz) + scan_offset, nullptr, y); - //TODO a second range request?? csv_processing may indicate a need for a second round(what about another one?) + + m_requested_range = (m_end_scan_sz - m_start_scan_sz); + // fetch more than requested(m_scan_offset), that additional bytes are scanned for end of row, + // thus the additional length will be processed, and no broken row for Trino. + // assumption: row is smaller than m_scan_offset. (a different approach is to request for additional range) + range_request(m_start_scan_sz, (m_end_scan_sz - m_start_scan_sz) + m_scan_offset, nullptr, y); } else { RGWGetObj::execute(y); } @@ -751,31 +752,55 @@ int RGWSelectObj_ObjStore_S3::parquet_processing(bufferlist& bl, off_t ofs, off_ } return 0; } -#if 1 -void RGWSelectObj_ObjStore_S3::continue_to_end_of_csv_row(const char* it_cp, off_t ofs, off_t& len) -{ -//in case it is a scan range response, and row-delimiter(\n) is not included in last row -//in needs to fetch another range(defualt = 1024, maybe its better to that on first call) - if(m_scan_range_ind){ - int sc=0; - size_t scan_offset = 1024; //TODO data member - - // if it's the last chunk , reduce scan-offset and search for first "\n" - if((m_aws_response_handler.get_processed_size()+len)>=m_requested_range) - { - for(sc=(len - scan_offset); sc < len; sc++) + +void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp, off_t& ofs, off_t& len) +{ +//in case it is a scan range response, and row might be broken, this routine search for the end-of-row. + off_t new_offset = 0; + if(m_scan_range_ind){//only upon range-scan + uint64_t sc=0; + uint64_t start =0; + const char* row_delimiter = m_row_delimiter.c_str(); + + //chop the head of the first chunk. + if(m_start_scan_sz && (m_aws_response_handler.get_processed_size()==0)){ + char* p = (char*)it_cp; + while(strncmp(row_delimiter,p,1) && (p - it_cp) < len)p++; + if(!strncmp(row_delimiter,p,1)){ + new_offset += (p - it_cp)+1; + } + } + + //chop the end of chunk + //if it's the last chunk, reduce scan-offset and search for first row-delimiter + if((m_aws_response_handler.get_processed_size()+len) >= m_requested_range){ + //had pass the requested range, start to search for first delimiter + if(m_aws_response_handler.get_processed_size()){ + start = m_aws_response_handler.get_processed_size() - m_requested_range; + } else { + start = m_requested_range; + } + + ldout(s->cct, 10) << "S3select: start scan from " << start << " len = " << len << dendl; + for(sc=start;sccct, 10) << "S3select: found row-delimiter on " << sc << " get_processed_size = " << m_aws_response_handler.get_processed_size() << dendl; + len = sc + 1;//+1 is for delimiter. TODO what about m_object_size_for_processing (to update according to len) break; + } } - len = sc + 1;//+1 for delimiter ldpp_dout(this, 10) << "len =" << len << dendl; } + + ofs += new_offset; } - m_aws_response_handler.update_processed_size(len); + + ldout(s->cct, 10) << "S3select: shape_chunk_per_trino_requests:update progress len = " << len << dendl; + m_aws_response_handler.update_processed_size(len);//TODO : to run analysis to validate len is aligned with m_processed_bytes + len -= new_offset; } -#endif int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t len) { @@ -804,8 +829,7 @@ int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t le len = it.length(); } - //TODO : should run only for the last element of bufferlist - continue_to_end_of_csv_row(&(it)[0], ofs, len); + shape_chunk_per_trino_requests(&(it)[0], ofs, len); status = run_s3select_on_csv(m_sql_query.c_str(), &(it)[0] + ofs, len); if (status<0) { return -EINVAL; @@ -909,7 +933,11 @@ int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_ if (m_end_scan_sz == -1){ m_end_scan_sz = s->obj_size; } - m_object_size_for_processing = m_end_scan_sz - m_start_scan_sz; + if ((m_end_scan_sz - m_start_scan_sz)>s->obj_size){ //in the case user provides range bigger than object-size + m_object_size_for_processing = s->obj_size; + } else { + m_object_size_for_processing = m_end_scan_sz - m_start_scan_sz; + } } if (!m_aws_response_handler.is_set()) { m_aws_response_handler.set(s, this); diff --git a/src/rgw/rgw_s3select_private.h b/src/rgw/rgw_s3select_private.h index fdf0a2c07b555..c694c770a1097 100644 --- a/src/rgw/rgw_s3select_private.h +++ b/src/rgw/rgw_s3select_private.h @@ -214,6 +214,7 @@ private: public: unsigned int chunk_number; size_t m_requested_range; + size_t m_scan_offset; RGWSelectObj_ObjStore_S3(); virtual ~RGWSelectObj_ObjStore_S3(); @@ -250,6 +251,6 @@ private: std::function fp_range_req; std::function fp_get_obj_size; - void continue_to_end_of_csv_row(const char*, off_t ofs, off_t& len); + void shape_chunk_per_trino_requests(const char*, off_t& ofs, off_t& len); }; -- 2.39.5