From a4d0c9ad49b6304ee35ecbb7dfa816f650f81e1f Mon Sep 17 00:00:00 2001 From: galsalomon66 Date: Fri, 10 Mar 2023 14:27:05 +0200 Subject: [PATCH] rgw: reef: adding s3test albin/json-op-serial modify json chunk processing function to handle offset/length as csv-processing a fix valgrind :: Conditional jump or move depends on uninitialised value upon using Trino the Trino-server issue multiple requests per single query,upon completion of all requests the results are merged (by Trino). these request splits the input into equal parts; the RGW side should be aligned with Trino expectations(for result). fixing the main routine for shaping the chunk (range-scan) for Trino processing upon removing the payload-TAG, it need to change the response element index handling more use cases for "shaping" the processed chunk by s3select per Trino request re-shape the processed chunk only upon Trino sent the request bug-fix: the chunk offset was not handle correctly bug-fix: progress-message calcualation modifying the range-request boundaries only upon Trino request. Signed-off-by: galsalomon66 (cherry picked from commit a62588959d28bd2bf16cc3bb9482e22e6f9eb195) --- src/rgw/rgw_s3select.cc | 181 +++++++++++++++++++++++++++------ src/rgw/rgw_s3select_private.h | 5 + 2 files changed, 156 insertions(+), 30 deletions(-) diff --git a/src/rgw/rgw_s3select.cc b/src/rgw/rgw_s3select.cc index 3a11ddf839c..7d93569f3a4 100644 --- a/src/rgw/rgw_s3select.cc +++ b/src/rgw/rgw_s3select.cc @@ -164,7 +164,9 @@ void aws_response_handler::init_success_response() m_buff_header.clear(); header_size = create_header_records(); sql_result.append(m_buff_header.c_str(), header_size); +#ifdef PAYLOAD_TAG sql_result.append(PAYLOAD_LINE); +#endif } void aws_response_handler::send_continuation_response() @@ -215,7 +217,9 @@ void aws_response_handler::init_error_response(const char* error_message) void aws_response_handler::send_success_response() { +#ifdef PAYLOAD_TAG sql_result.append(END_PAYLOAD_LINE); +#endif int buff_len = create_message(header_size); s->formatter->write_bin_data(sql_result.data(), buff_len); rgw_flush_formatter_and_reset(s, s->formatter); @@ -266,7 +270,11 @@ RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3(): m_object_size_for_processing(0), m_parquet_type(false), m_json_type(false), - chunk_number(0) + chunk_number(0), + m_requested_range(0), + m_scan_offset(1024), + m_skip_next_chunk(false), + m_is_trino_request(false) { set_get_data(true); fp_get_obj_size = [&]() { @@ -339,6 +347,16 @@ int RGWSelectObj_ObjStore_S3::get_params(optional_yield y) ldpp_dout(this, 10) << "s3-select query: failed to retrieve query;" << dendl; return -1; } + const auto& m = s->info.env->get_map(); + auto user_agent = m.find("HTTP_USER_AGENT"); { + if (user_agent != m.end()){ + if (user_agent->second.find("Trino") != std::string::npos){ + m_is_trino_request = true; + ldpp_dout(this, 10) << "s3-select query: request sent by Trino." << dendl; + } + } + } + int status = handle_aws_cli_parameters(m_sql_query); if (status<0) { return status; @@ -368,11 +386,6 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_csv(const char* query, const char* if (m_escape_char.size()) { csv.escape_char = *m_escape_char.c_str(); } - if (m_enable_progress.compare("true")==0) { - enable_progress = true; - } else { - enable_progress = false; - } if (output_row_delimiter.size()) { csv.output_row_delimiter = *output_row_delimiter.c_str(); } @@ -429,7 +442,6 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_csv(const char* query, const char* } if ((length_post_processing-length_before_processing) != 0) { ldpp_dout(this, 10) << "s3-select: sql-result-size = " << m_aws_response_handler.get_sql_result().size() << dendl; - ldpp_dout(this, 10) << "s3-select: sql-result{" << m_aws_response_handler.get_sql_result() << "}" << dendl; } else { m_aws_response_handler.send_continuation_response(); } @@ -638,6 +650,11 @@ int RGWSelectObj_ObjStore_S3::handle_aws_cli_parameters(std::string& sql_query) m_end_scan_sz = std::numeric_limits::max(); } } + if (m_enable_progress.compare("true")==0) { + enable_progress = true; + } else { + enable_progress = false; + } return 0; } @@ -710,8 +727,18 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y) } else { //CSV or JSON processing if (m_scan_range_ind) { - //scan-range - range_request(m_start_scan_sz, m_end_scan_sz - m_start_scan_sz, nullptr, y); + + m_requested_range = (m_end_scan_sz - m_start_scan_sz); + + if(m_is_trino_request){ + // fetch more than requested(m_scan_offset), that additional bytes are scanned for end of row, + // thus the additional length will be processed, and no broken row for Trino. + // assumption: row is smaller than m_scan_offset. (a different approach is to request for additional range) + range_request(m_start_scan_sz, m_requested_range + m_scan_offset, nullptr, y); + } else { + range_request(m_start_scan_sz, m_requested_range, nullptr, y); + } + } else { RGWGetObj::execute(y); } @@ -743,10 +770,80 @@ int RGWSelectObj_ObjStore_S3::parquet_processing(bufferlist& bl, off_t ofs, off_ return 0; } +void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp, off_t& ofs, off_t& len) +{ +//in case it is a scan range request and sent by Trino client. +//this routine chops the start/end of chunks. +//the purpose is to return "perfect" results, with no broken or missing lines. + + off_t new_offset = 0; + if(m_scan_range_ind){//only upon range-scan + int64_t sc=0; + int64_t start =0; + const char* row_delimiter = m_row_delimiter.c_str(); + + ldpp_dout(this, 10) << "s3select query: per Trino request the first and last chunk should modified." << dendl; + + //chop the head of the first chunk and only upon the slice does not include the head of the object. + if(m_start_scan_sz && (m_aws_response_handler.get_processed_size()==0)){ + char* p = const_cast(it_cp+ofs); + while(strncmp(row_delimiter,p,1) && (p - (it_cp+ofs)) < len)p++; + if(!strncmp(row_delimiter,p,1)){ + new_offset += (p - (it_cp+ofs))+1; + } + } + + //RR : end of the range-request. the original request sent by Trino client + //RD : row-delimiter + //[ ... ] : chunk boundaries + + //chop the end of the last chunk for this request + //if it's the last chunk, search for first row-delimiter for the following different use-cases + if((m_aws_response_handler.get_processed_size()+len) >= m_requested_range){ + //had pass the requested range, start to search for first delimiter + if(m_aws_response_handler.get_processed_size()>m_requested_range){ + //the previous chunk contain the complete request(all data) and an extra bytes. + //thus, search for the first row-delimiter + //[:previous (RR) ... ][:current (RD) ] + start = 0; + } else if(m_aws_response_handler.get_processed_size()){ + //the *current* chunk contain the complete request in the middle of the chunk. + //thus, search for the first row-delimiter after the complete request position + //[:current (RR) .... (RD) ] + start = m_requested_range - m_aws_response_handler.get_processed_size(); + } else { + //the current chunk is the first chunk and it contains complete request + //[:current:first-chunk (RR) .... (RD) ] + start = m_requested_range; + } + + for(sc=start;sc(it_cp) + ofs + sc; + if(!strncmp(row_delimiter,p,1)){ + ldout(s->cct, 10) << "S3select: found row-delimiter on " << sc << " get_processed_size = " << m_aws_response_handler.get_processed_size() << dendl; + len = sc + 1;//+1 is for delimiter. TODO what about m_object_size_for_processing (to update according to len) + //the end of row exist in current chunk. + //thus, the next chunk should be skipped + m_skip_next_chunk = true; + break; + } + } + } + ofs += new_offset; + } + + ldout(s->cct, 10) << "S3select: shape_chunk_per_trino_requests:update progress len = " << len << dendl; + len -= new_offset; +} + int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t len) { int status = 0; - + if(m_skip_next_chunk == true){ + return status; + } + if (s->obj_size == 0 || m_object_size_for_processing == 0) { status = run_s3select_on_csv(m_sql_query.c_str(), nullptr, 0); if (status<0){ @@ -754,38 +851,50 @@ int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t le } } else { auto bl_len = bl.get_num_buffers(); - int i=0; + int buff_no=0; for(auto& it : bl.buffers()) { - ldpp_dout(this, 10) << "processing segment " << i << " out of " << bl_len << " off " << ofs + ldpp_dout(this, 10) << "s3select :processing segment " << buff_no << " out of " << bl_len << " off " << ofs << " len " << len << " obj-size " << m_object_size_for_processing << dendl; if (it.length() == 0 || len == 0) { - ldpp_dout(this, 10) << "s3select:it->_len is zero. segment " << i << " out of " << bl_len + ldpp_dout(this, 10) << "s3select :it->_len is zero. segment " << buff_no << " out of " << bl_len << " obj-size " << m_object_size_for_processing << dendl; continue; } - //NOTE: the it.length() must be used (not len) - m_aws_response_handler.update_processed_size(it.length()); if((ofs + len) > it.length()){ - ldpp_dout(this, 10) << "offset and lenghth may cause invalid read: ofs = " << ofs << " len = " << len << " it.length() = " << it.length() << dendl; + ldpp_dout(this, 10) << "offset and length may cause invalid read: ofs = " << ofs << " len = " << len << " it.length() = " << it.length() << dendl; ofs = 0; len = it.length(); } - status = run_s3select_on_csv(m_sql_query.c_str(), &(it)[0] +ofs, len); - if (status<0) { - return -EINVAL; - } - if (m_s3_csv_object.is_sql_limit_reached()) { - break; - } - i++; + + if(m_is_trino_request){ + shape_chunk_per_trino_requests(&(it)[0], ofs, len); } - } - if (m_aws_response_handler.get_processed_size() == uint64_t(m_object_size_for_processing) || m_s3_csv_object.is_sql_limit_reached()) { + + ldpp_dout(this, 10) << "s3select: chunk: ofs = " << ofs << " len = " << len << " it.length() = " << it.length() << " m_object_size_for_processing = " << m_object_size_for_processing << dendl; + + m_aws_response_handler.update_processed_size(it.length());//NOTE : to run analysis to validate len is aligned with m_processed_bytes + status = run_s3select_on_csv(m_sql_query.c_str(), &(it)[0] + ofs, len); + if (status<0) { + return -EINVAL; + } + if (m_s3_csv_object.is_sql_limit_reached()) { + break; + } + buff_no++; + }//for + }//else + + ldpp_dout(this, 10) << "s3select : m_aws_response_handler.get_processed_size() " << m_aws_response_handler.get_processed_size() + << " m_object_size_for_processing " << uint64_t(m_object_size_for_processing) << dendl; + + if (m_aws_response_handler.get_processed_size() >= uint64_t(m_object_size_for_processing) || m_s3_csv_object.is_sql_limit_reached()) { if (status >=0) { m_aws_response_handler.init_stats_response(); m_aws_response_handler.send_stats_response(); m_aws_response_handler.init_end_response(); + ldpp_dout(this, 10) << "s3select : reached the end of query request : aws_response_handler.get_processed_size() " << m_aws_response_handler.get_processed_size() + << "m_object_size_for_processing : " << m_object_size_for_processing << dendl; } if (m_s3_csv_object.is_sql_limit_reached()) { //stop fetching chunks @@ -793,6 +902,7 @@ int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t le status = -ENOENT; } } + return status; } @@ -818,8 +928,15 @@ int RGWSelectObj_ObjStore_S3::json_processing(bufferlist& bl, off_t ofs, off_t l << " obj-size " << m_object_size_for_processing << dendl; continue; } + + if((ofs + len) > it.length()){ + ldpp_dout(this, 10) << "s3select: offset and length may cause invalid read: ofs = " << ofs << " len = " << len << " it.length() = " << it.length() << dendl; + ofs = 0; + len = it.length(); + } + m_aws_response_handler.update_processed_size(len); - status = run_s3select_on_json(m_sql_query.c_str(), &(it)[0], len); + status = run_s3select_on_json(m_sql_query.c_str(), &(it)[0] + ofs, len); if (status<0) { status = -EINVAL; break; @@ -828,8 +945,8 @@ int RGWSelectObj_ObjStore_S3::json_processing(bufferlist& bl, off_t ofs, off_t l break; } i++; - } - } + }//for + }//else if (status>=0 && (m_aws_response_handler.get_processed_size() == uint64_t(m_object_size_for_processing) || m_s3_json_object.is_sql_limit_reached())) { //flush the internal JSON buffer(upon last chunk) @@ -860,7 +977,11 @@ int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_ if (m_end_scan_sz == -1){ m_end_scan_sz = s->obj_size; } - m_object_size_for_processing = m_end_scan_sz - m_start_scan_sz; + if (static_cast((m_end_scan_sz - m_start_scan_sz))>s->obj_size){ //in the case user provides range bigger than object-size + m_object_size_for_processing = s->obj_size; + } else { + m_object_size_for_processing = m_end_scan_sz - m_start_scan_sz; + } } if (!m_aws_response_handler.is_set()) { m_aws_response_handler.set(s, this); diff --git a/src/rgw/rgw_s3select_private.h b/src/rgw/rgw_s3select_private.h index 7ac9167caee..fa595b0da59 100644 --- a/src/rgw/rgw_s3select_private.h +++ b/src/rgw/rgw_s3select_private.h @@ -213,6 +213,10 @@ private: public: unsigned int chunk_number; + size_t m_requested_range; + size_t m_scan_offset; + bool m_skip_next_chunk; + bool m_is_trino_request; RGWSelectObj_ObjStore_S3(); virtual ~RGWSelectObj_ObjStore_S3(); @@ -249,5 +253,6 @@ private: std::function fp_range_req; std::function fp_get_obj_size; + void shape_chunk_per_trino_requests(const char*, off_t& ofs, off_t& len); }; -- 2.39.5