From f2e7eb59cceab8a08ac7ffea6f842889b866108e Mon Sep 17 00:00:00 2001 From: Gal Salomon Date: Wed, 4 Sep 2024 13:12:55 +0300 Subject: [PATCH] move the parquet-reader-setup call location. editorial. Signed-off-by: Gal Salomon --- src/common/options/rgw.yaml.in | 2 +- src/rgw/rgw_s3select.cc | 12 +++++++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/src/common/options/rgw.yaml.in b/src/common/options/rgw.yaml.in index c47eafe89f91b..2cfc5f80868ba 100644 --- a/src/common/options/rgw.yaml.in +++ b/src/common/options/rgw.yaml.in @@ -63,7 +63,7 @@ options: type: size level: advanced desc: the Maximum parquet buffer size, a limit to memory consumption for parquet reading operations. - default: 1_G + default: 16_M services: - rgw with_legacy: true diff --git a/src/rgw/rgw_s3select.cc b/src/rgw/rgw_s3select.cc index 4b1d0ebfef123..39794e990b02b 100644 --- a/src/rgw/rgw_s3select.cc +++ b/src/rgw/rgw_s3select.cc @@ -486,6 +486,8 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_parquet(const char* query) fp_debug_mesg); try { + //setting the Parquet-reader properties. i.e. the buffer-size for the Parquet-reader + parquet::ceph::S3select_Config::getInstance().set_s3select_reader_properties(s->cct->_conf->rgw_parquet_buffer_size); //at this stage the Parquet-processing requires for the meta-data that reside on Parquet object m_s3_parquet_object.set_parquet_object(std::string("s3object"), &s3select_syntax, &m_rgw_api, parquet); } catch(base_s3select_exception& e) { @@ -751,6 +753,9 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y) return; } s3select_syntax.parse_query(m_sql_query.c_str()); + //the run_s3select_on_parquet() calling the s3select-query-engine, that read and process the parquet object with RGW::range_request, + //upon query-engine finish the processing, the control is back to execute() + //the parquet-reader indicates the end of the parquet object. status = run_s3select_on_parquet(m_sql_query.c_str()); if (status) { ldout(s->cct, 10) << "S3select: failed to process query <" << m_sql_query << "> on object " << s->object->get_name() << dendl; @@ -758,6 +763,7 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y) } else { {//status per amount of processed data //TODO add number of calls to range_request + //TODO check stats for the amount of data processed (scanned, returned) m_aws_response_handler.init_stats_response(); m_aws_response_handler.send_stats_response(); m_aws_response_handler.init_end_response(); @@ -789,11 +795,11 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y) } int RGWSelectObj_ObjStore_S3::parquet_processing(bufferlist& bl, off_t ofs, off_t len) -{ +{//purpose: to process the returned buffer from range-request, and to send it to the Parquet-reader. + //range_request() is called by arrow::ReadAt, and upon completion the control is back to execute() fp_chunked_transfer_encoding(); size_t append_in_callback = 0; int part_no = 1; - parquet::ceph::S3select_Config::getInstance().set_s3select_reader_properties(s->cct->_conf->rgw_parquet_buffer_size); //concat the requested buffer for (auto& it : bl.buffers()) { if (it.length() == 0) { @@ -1042,7 +1048,7 @@ int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_ if (len == 0 && s->obj_size != 0) { return 0; } - if (m_parquet_type) { + if (m_parquet_type) {//bufferlist sendback upon range-request return parquet_processing(bl,ofs,len); } if (m_json_type) { -- 2.39.5