From: galsalomon66 Date: Tue, 21 Mar 2023 07:17:40 +0000 (+0200) Subject: upon using Trino the Trino-server issue multiple requests per single query,upon compl... X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=d73569a46d0ecbdce6f2a0824ec761f04eaa125c;p=ceph.git upon using Trino the Trino-server issue multiple requests per single query,upon completion of all requests the results are merged (by Trino). these request splits the input into equal parts; the RGW side should be aligned with Trino expectations(for result). Signed-off-by: galsalomon66 --- diff --git a/src/rgw/rgw_s3select.cc b/src/rgw/rgw_s3select.cc index 1a89afd13c3..8c8a5ef0b29 100644 --- a/src/rgw/rgw_s3select.cc +++ b/src/rgw/rgw_s3select.cc @@ -164,7 +164,7 @@ void aws_response_handler::init_success_response() m_buff_header.clear(); header_size = create_header_records(); sql_result.append(m_buff_header.c_str(), header_size); - sql_result.append(PAYLOAD_LINE); + //sql_result.append(PAYLOAD_LINE); //TODO add switch } void aws_response_handler::send_continuation_response() @@ -215,7 +215,7 @@ void aws_response_handler::init_error_response(const char* error_message) void aws_response_handler::send_success_response() { - sql_result.append(END_PAYLOAD_LINE); + //sql_result.append(END_PAYLOAD_LINE); int buff_len = create_message(header_size); s->formatter->write_bin_data(sql_result.data(), buff_len); rgw_flush_formatter_and_reset(s, s->formatter); @@ -266,7 +266,8 @@ RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3(): m_object_size_for_processing(0), m_parquet_type(false), m_json_type(false), - chunk_number(0) + chunk_number(0), + m_requested_range(0) { set_get_data(true); fp_get_obj_size = [&]() { @@ -711,7 +712,15 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y) //CSV or JSON processing if (m_scan_range_ind) { //scan-range - range_request(m_start_scan_sz, m_end_scan_sz - m_start_scan_sz, nullptr, y); + //TODO add 1024 to m_end_scan_sz. and search for first row-delimiter starting m_end_scan_sz(before adding 1024) + size_t scan_offset = 1024; //TODO data member + + if(s->obj_size && ((m_end_scan_sz + scan_offset) > s->obj_size)){ + scan_offset = s->obj_size - m_end_scan_sz; + } + m_requested_range = (m_end_scan_sz - m_start_scan_sz) + scan_offset; + range_request(m_start_scan_sz, (m_end_scan_sz - m_start_scan_sz) + scan_offset, nullptr, y); + //TODO a second range request?? csv_processing may indicate a need for a second round(what about another one?) } else { RGWGetObj::execute(y); } @@ -742,6 +751,31 @@ int RGWSelectObj_ObjStore_S3::parquet_processing(bufferlist& bl, off_t ofs, off_ } return 0; } +#if 1 +void RGWSelectObj_ObjStore_S3::continue_to_end_of_csv_row(const char* it_cp, off_t ofs, off_t& len) +{ +//in case it is a scan range response, and row-delimiter(\n) is not included in last row +//in needs to fetch another range(defualt = 1024, maybe its better to that on first call) + if(m_scan_range_ind){ + int sc=0; + size_t scan_offset = 1024; //TODO data member + + // if it's the last chunk , reduce scan-offset and search for first "\n" + if((m_aws_response_handler.get_processed_size()+len)>=m_requested_range) + { + for(sc=(len - scan_offset); sc < len; sc++) + { + char* p=(char*)it_cp + ofs + sc; + if(!strncmp("\n",p,1)) + break; + } + len = sc + 1;//+1 for delimiter + ldpp_dout(this, 10) << "len =" << len << dendl; + } + } + m_aws_response_handler.update_processed_size(len); +} +#endif int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t len) { @@ -754,38 +788,45 @@ int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t le } } else { auto bl_len = bl.get_num_buffers(); - int i=0; + int buff_no=0; for(auto& it : bl.buffers()) { - ldpp_dout(this, 10) << "processing segment " << i << " out of " << bl_len << " off " << ofs + ldpp_dout(this, 10) << "processing segment " << buff_no << " out of " << bl_len << " off " << ofs << " len " << len << " obj-size " << m_object_size_for_processing << dendl; if (it.length() == 0 || len == 0) { - ldpp_dout(this, 10) << "s3select:it->_len is zero. segment " << i << " out of " << bl_len + ldpp_dout(this, 10) << "s3select:it->_len is zero. segment " << buff_no << " out of " << bl_len << " obj-size " << m_object_size_for_processing << dendl; continue; } - //NOTE: the it.length() must be used (not len) - m_aws_response_handler.update_processed_size(it.length()); if((ofs + len) > it.length()){ ldpp_dout(this, 10) << "offset and lenghth may cause invalid read: ofs = " << ofs << " len = " << len << " it.length() = " << it.length() << dendl; ofs = 0; len = it.length(); } - status = run_s3select_on_csv(m_sql_query.c_str(), &(it)[0] +ofs, len); - if (status<0) { - return -EINVAL; - } - if (m_s3_csv_object.is_sql_limit_reached()) { - break; - } - i++; + + //TODO : should run only for the last element of bufferlist + continue_to_end_of_csv_row(&(it)[0], ofs, len); + status = run_s3select_on_csv(m_sql_query.c_str(), &(it)[0] + ofs, len); + if (status<0) { + return -EINVAL; } - } - if (m_aws_response_handler.get_processed_size() == uint64_t(m_object_size_for_processing) || m_s3_csv_object.is_sql_limit_reached()) { + if (m_s3_csv_object.is_sql_limit_reached()) { + break; + } + buff_no++; + }//for + }//else + + ldpp_dout(this, 10) << "s3select : m_aws_response_handler.get_processed_size() " << m_aws_response_handler.get_processed_size() + << " m_object_size_for_processing " << uint64_t(m_object_size_for_processing) << dendl; + + if (m_aws_response_handler.get_processed_size() >= uint64_t(m_object_size_for_processing) || m_s3_csv_object.is_sql_limit_reached()) { if (status >=0) { m_aws_response_handler.init_stats_response(); m_aws_response_handler.send_stats_response(); m_aws_response_handler.init_end_response(); + ldpp_dout(this, 10) << "s3select : reached the end of query request : aws_response_handler.get_processed_size() " << m_aws_response_handler.get_processed_size() + << "m_object_size_for_processing : " << m_object_size_for_processing << dendl; } if (m_s3_csv_object.is_sql_limit_reached()) { //stop fetching chunks @@ -793,6 +834,7 @@ int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t le status = -ENOENT; } } + return status; } diff --git a/src/rgw/rgw_s3select_private.h b/src/rgw/rgw_s3select_private.h index 7ac9167caee..fdf0a2c07b5 100644 --- a/src/rgw/rgw_s3select_private.h +++ b/src/rgw/rgw_s3select_private.h @@ -213,6 +213,7 @@ private: public: unsigned int chunk_number; + size_t m_requested_range; RGWSelectObj_ObjStore_S3(); virtual ~RGWSelectObj_ObjStore_S3(); @@ -249,5 +250,6 @@ private: std::function fp_range_req; std::function fp_get_obj_size; + void continue_to_end_of_csv_row(const char*, off_t ofs, off_t& len); };