From: Gal Salomon Date: Mon, 2 Sep 2024 17:19:54 +0000 (+0300) Subject: s3select submodule. X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=32a67e854c93cbda6202b45a78ccd79c9c52608c;p=ceph.git s3select submodule. RGW option per parquet read-buffer. identation. alignment with s3select updated APIs. Signed-off-by: Gal Salomon --- diff --git a/src/common/options/rgw.yaml.in b/src/common/options/rgw.yaml.in index 09d37dfcd8261..c47eafe89f91b 100644 --- a/src/common/options/rgw.yaml.in +++ b/src/common/options/rgw.yaml.in @@ -59,6 +59,14 @@ options: services: - rgw with_legacy: true +- name: rgw_parquet_buffer_size + type: size + level: advanced + desc: the Maximum parquet buffer size, a limit to memory consumption for parquet reading operations. + default: 1_G + services: + - rgw + with_legacy: true - name: rgw_rados_tracing type: bool level: advanced diff --git a/src/rgw/rgw_s3select.cc b/src/rgw/rgw_s3select.cc index 800d276a6aab1..79e6c53167ec9 100644 --- a/src/rgw/rgw_s3select.cc +++ b/src/rgw/rgw_s3select.cc @@ -287,6 +287,7 @@ RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3(): m_object_size_for_processing(0), m_parquet_type(false), m_json_type(false), + m_outputFormat(OutputFormat::CSV), chunk_number(0), m_requested_range(0), m_scan_offset(1024), @@ -344,7 +345,7 @@ RGWSelectObj_ObjStore_S3::~RGWSelectObj_ObjStore_S3() int RGWSelectObj_ObjStore_S3::get_params(optional_yield y) { - if(m_s3select_query.empty() == false) { + if (m_s3select_query.empty() == false) { return 0; } #ifndef _ARROW_EXIST @@ -416,17 +417,19 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_csv(const char* query, const char* if (output_escape_char.size()) { csv.output_escape_char = *output_escape_char.c_str(); } - if(output_quote_fields.compare("ALWAYS") == 0) { + if (output_quote_fields.compare("ALWAYS") == 0) { csv.quote_fields_always = true; - } else if(output_quote_fields.compare("ASNEEDED") == 0) { + } else if (output_quote_fields.compare("ASNEEDED") == 0) { csv.quote_fields_asneeded = true; } - if(m_header_info.compare("IGNORE")==0) { + if (m_header_info.compare("IGNORE")==0) { csv.ignore_header_info=true; - } else if(m_header_info.compare("USE")==0) { + } else if (m_header_info.compare("USE")==0) { csv.use_header_info=true; } - + if (m_outputFormat == OutputFormat::JSON) { + csv.output_json_format = true; + } m_s3_csv_object.set_csv_query(&s3select_syntax, csv); m_s3_csv_object.set_external_system_functions(fp_s3select_continue, @@ -478,6 +481,10 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_parquet(const char* query) if (!m_s3_parquet_object.is_set()) { //parsing the SQL statement. s3select_syntax.parse_query(m_sql_query.c_str()); + parquet_object::csv_definitions parquet; + if (m_outputFormat == OutputFormat::JSON) { + parquet.output_json_format = true; + } m_s3_parquet_object.set_external_system_functions(fp_s3select_continue, fp_s3select_result_format, @@ -486,7 +493,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_parquet(const char* query) try { //at this stage the Parquet-processing requires for the meta-data that reside on Parquet object - m_s3_parquet_object.set_parquet_object(std::string("s3object"), &s3select_syntax, &m_rgw_api); + m_s3_parquet_object.set_parquet_object(std::string("s3object"), &s3select_syntax, &m_rgw_api, parquet); } catch(base_s3select_exception& e) { ldpp_dout(this, 10) << "S3select: failed upon parquet-reader construction: " << e.what() << dendl; fp_result_header_format(m_aws_response_handler.get_sql_result()); @@ -524,6 +531,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char fp_s3select_result_format, fp_result_header_format, fp_debug_mesg); + json_object::csv_definitions json; m_aws_response_handler.init_response(); @@ -536,6 +544,10 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char return -EINVAL; } + if (m_outputFormat == OutputFormat::JSON) { + json.output_json_format = true; + } + //parsing the SQL statement s3select_syntax.parse_query(m_sql_query.c_str()); if (s3select_syntax.get_error_description().empty() == false) { @@ -547,8 +559,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char } //initializing json processor - json_object::csv_definitions output_definition; - m_s3_json_object.set_json_query(&s3select_syntax,output_definition); + m_s3_json_object.set_json_query(&s3select_syntax, json); if (input == nullptr) { input = ""; @@ -618,6 +629,10 @@ int RGWSelectObj_ObjStore_S3::handle_aws_cli_parameters(std::string& sql_query) ldpp_dout(this, 10) << "s3select: engine is set to process Parquet objects" << dendl; } + if (m_s3select_query.find(output_tag+">", 0); @@ -706,6 +721,7 @@ int RGWSelectObj_ObjStore_S3::range_request(int64_t ofs, int64_t len, void* buff RGWGetObj::parse_range(); requested_buffer.clear(); m_request_range = len; + m_aws_response_handler.update_processed_size(len); ldout(s->cct, 10) << "S3select: calling execute(async):" << " request-offset :" << ofs << " request-length :" << len << " buffer size : " << requested_buffer.size() << dendl; RGWGetObj::execute(y); if (buff) { @@ -730,7 +746,7 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y) m_aws_response_handler.set(s, this, fp_chunked_transfer_encoding); } - if(s->cct->_conf->rgw_disable_s3select == true) + if (s->cct->_conf->rgw_disable_s3select == true) { std::string error_msg="s3select : is disabled by rgw_disable_s3select configuration parameter"; ldpp_dout(this, 10) << error_msg << dendl; @@ -754,7 +770,16 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y) ldout(s->cct, 10) << "S3select: failed to process query <" << m_sql_query << "> on object " << s->object->get_name() << dendl; op_ret = -ERR_INVALID_REQUEST; } else { - ldout(s->cct, 10) << "S3select: complete query with success " << dendl; + {//status per amount of processed data + //TODO add number of calls to range_request + m_aws_response_handler.init_stats_response(); + m_aws_response_handler.send_stats_response(); + m_aws_response_handler.init_end_response(); + ldpp_dout(this, 10) << "s3select : reached the end of parquet query request : aws_response_handler.get_processed_size() " + << m_aws_response_handler.get_processed_size() + << "m_object_size_for_processing : " << m_object_size_for_processing << dendl; + } + ldout(s->cct, 10) << "S3select: complete parquet query with success " << dendl; } } else { //CSV or JSON processing @@ -762,7 +787,7 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y) m_requested_range = (m_end_scan_sz - m_start_scan_sz); - if(m_is_trino_request){ + if (m_is_trino_request){ // fetch more than requested(m_scan_offset), that additional bytes are scanned for end of row, // thus the additional length will be processed, and no broken row for Trino. // assumption: row is smaller than m_scan_offset. (a different approach is to request for additional range) @@ -782,6 +807,7 @@ int RGWSelectObj_ObjStore_S3::parquet_processing(bufferlist& bl, off_t ofs, off_ fp_chunked_transfer_encoding(); size_t append_in_callback = 0; int part_no = 1; + parquet::ceph::S3select_Config::getInstance().set_s3select_reader_properties(s->cct->_conf->rgw_parquet_buffer_size); //concat the requested buffer for (auto& it : bl.buffers()) { if (it.length() == 0) { @@ -789,6 +815,11 @@ int RGWSelectObj_ObjStore_S3::parquet_processing(bufferlist& bl, off_t ofs, off_ } append_in_callback += it.length(); ldout(s->cct, 10) << "S3select: part " << part_no++ << " it.length() = " << it.length() << dendl; + if ((ofs + len) > it.length()){ + ldpp_dout(this, 10) << "s3select: offset and length may cause invalid read: ofs = " << ofs << " len = " << len << " it.length() = " << it.length() << dendl; + ofs = 0; + len = it.length(); + } requested_buffer.append(&(it)[0]+ofs, len); } ldout(s->cct, 10) << "S3select:append_in_callback = " << append_in_callback << dendl; @@ -809,7 +840,7 @@ void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp, //the purpose is to return "perfect" results, with no broken or missing lines. off_t new_offset = 0; - if(m_scan_range_ind){//only upon range-scan + if (m_scan_range_ind){//only upon range-scan int64_t sc=0; int64_t start =0; const char* row_delimiter = m_row_delimiter.c_str(); @@ -817,10 +848,10 @@ void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp, ldpp_dout(this, 10) << "s3select query: per Trino request the first and last chunk should modified." << dendl; //chop the head of the first chunk and only upon the slice does not include the head of the object. - if(m_start_scan_sz && (m_aws_response_handler.get_processed_size()==0)){ + if (m_start_scan_sz && (m_aws_response_handler.get_processed_size()==0)){ char* p = const_cast(it_cp+ofs); while(strncmp(row_delimiter,p,1) && (p - (it_cp+ofs)) < len)p++; - if(!strncmp(row_delimiter,p,1)){ + if (!strncmp(row_delimiter,p,1)){ new_offset += (p - (it_cp+ofs))+1; } } @@ -831,14 +862,14 @@ void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp, //chop the end of the last chunk for this request //if it's the last chunk, search for first row-delimiter for the following different use-cases - if((m_aws_response_handler.get_processed_size()+len) >= m_requested_range){ + if ((m_aws_response_handler.get_processed_size()+len) >= m_requested_range){ //had pass the requested range, start to search for first delimiter - if(m_aws_response_handler.get_processed_size()>m_requested_range){ + if (m_aws_response_handler.get_processed_size()>m_requested_range){ //the previous chunk contain the complete request(all data) and an extra bytes. //thus, search for the first row-delimiter //[:previous (RR) ... ][:current (RD) ] start = 0; - } else if(m_aws_response_handler.get_processed_size()){ + } else if (m_aws_response_handler.get_processed_size()){ //the *current* chunk contain the complete request in the middle of the chunk. //thus, search for the first row-delimiter after the complete request position //[:current (RR) .... (RD) ] @@ -852,7 +883,7 @@ void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp, for(sc=start;sc(it_cp) + ofs + sc; - if(!strncmp(row_delimiter,p,1)){ + if (!strncmp(row_delimiter,p,1)){ ldout(s->cct, 10) << "S3select: found row-delimiter on " << sc << " get_processed_size = " << m_aws_response_handler.get_processed_size() << dendl; len = sc + 1;//+1 is for delimiter. TODO what about m_object_size_for_processing (to update according to len) //the end of row exist in current chunk. @@ -872,7 +903,7 @@ void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp, int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t len) { int status = 0; - if(m_skip_next_chunk == true){ + if (m_skip_next_chunk == true){ return status; } @@ -894,13 +925,13 @@ int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t le } - if(ofs > it.length()){ + if (ofs > it.length()){ //safety check ldpp_dout(this, 10) << "offset and length may cause invalid read: ofs = " << ofs << " len = " << len << " it.length() = " << it.length() << dendl; ofs = 0; } - if(m_is_trino_request){ + if (m_is_trino_request){ //TODO replace len with it.length() ? ; test Trino flow with compressed objects. //is it possible to send get-by-ranges? in parallel? shape_chunk_per_trino_requests(&(it)[0], ofs, len); @@ -964,7 +995,7 @@ int RGWSelectObj_ObjStore_S3::json_processing(bufferlist& bl, off_t ofs, off_t l continue; } - if((ofs + len) > it.length()){ + if ((ofs + len) > it.length()){ ldpp_dout(this, 10) << "s3select: offset and length may cause invalid read: ofs = " << ofs << " len = " << len << " it.length() = " << it.length() << dendl; ofs = 0; len = it.length(); diff --git a/src/rgw/rgw_s3select_private.h b/src/rgw/rgw_s3select_private.h index 7beac4f4a5d88..87d180acebc0f 100644 --- a/src/rgw/rgw_s3select_private.h +++ b/src/rgw/rgw_s3select_private.h @@ -241,6 +241,11 @@ private: const char* s3select_json_error = "InvalidJsonType"; public: + enum class OutputFormat { + CSV, + JSON + }; + OutputFormat m_outputFormat; unsigned int chunk_number; size_t m_requested_range; size_t m_scan_offset; diff --git a/src/s3select b/src/s3select index 9b9f35743ef2a..a8cafe8960816 160000 --- a/src/s3select +++ b/src/s3select @@ -1 +1 @@ -Subproject commit 9b9f35743ef2a1828e7a0ec26ffb02711e7e3d7d +Subproject commit a8cafe89608169bc8cc229f9ed14e022e43149b8