]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
move the parquet-reader-setup call location.
authorGal Salomon <gal.salomon@gmail.com>
Wed, 4 Sep 2024 10:12:55 +0000 (13:12 +0300)
committerGal Salomon <gal.salomon@gmail.com>
Mon, 30 Sep 2024 13:06:39 +0000 (16:06 +0300)
editorial.

Signed-off-by: Gal Salomon <gal.salomon@gmail.com>
src/common/options/rgw.yaml.in
src/rgw/rgw_s3select.cc

index c47eafe89f91bf535beeb88e8fae401e6a2337a0..2cfc5f80868ba15fcfa7167a1320ca6ef81ed157 100644 (file)
@@ -63,7 +63,7 @@ options:
   type: size
   level: advanced
   desc: the Maximum parquet buffer size, a limit to memory consumption for parquet reading operations.
-  default: 1_G
+  default: 16_M
   services:
   - rgw
   with_legacy: true
index 4b1d0ebfef12307b74104eba12d9e33fc65240c9..39794e990b02b2cff8eac04642e3448c4042798b 100644 (file)
@@ -486,6 +486,8 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_parquet(const char* query)
                                                fp_debug_mesg);
 
     try {
+      //setting the Parquet-reader properties. i.e. the buffer-size for the Parquet-reader
+      parquet::ceph::S3select_Config::getInstance().set_s3select_reader_properties(s->cct->_conf->rgw_parquet_buffer_size);
       //at this stage the Parquet-processing requires for the meta-data that reside on Parquet object 
       m_s3_parquet_object.set_parquet_object(std::string("s3object"), &s3select_syntax, &m_rgw_api, parquet);
     } catch(base_s3select_exception& e) {
@@ -751,6 +753,9 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y)
       return;
     }
     s3select_syntax.parse_query(m_sql_query.c_str());
+    //the run_s3select_on_parquet() calling the s3select-query-engine, that read and process the parquet object with RGW::range_request, 
+    //upon query-engine finish the processing, the control is back to execute()
+    //the parquet-reader indicates the end of the parquet object.
     status = run_s3select_on_parquet(m_sql_query.c_str());
     if (status) {
       ldout(s->cct, 10) << "S3select: failed to process query <" << m_sql_query << "> on object " << s->object->get_name() << dendl;
@@ -758,6 +763,7 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y)
     } else {
       {//status per amount of processed data
        //TODO add number of calls to range_request
+       //TODO check stats for the amount of data processed (scanned, returned)
       m_aws_response_handler.init_stats_response();
       m_aws_response_handler.send_stats_response();
       m_aws_response_handler.init_end_response();
@@ -789,11 +795,11 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y)
 }
 
 int RGWSelectObj_ObjStore_S3::parquet_processing(bufferlist& bl, off_t ofs, off_t len)
-{
+{//purpose: to process the returned buffer from range-request, and to send it to the Parquet-reader.
+ //range_request() is called by arrow::ReadAt, and upon completion the control is back to execute()
     fp_chunked_transfer_encoding();
     size_t append_in_callback = 0;
     int part_no = 1;
-    parquet::ceph::S3select_Config::getInstance().set_s3select_reader_properties(s->cct->_conf->rgw_parquet_buffer_size);
     //concat the requested buffer
     for (auto& it : bl.buffers()) {
       if (it.length() == 0) {
@@ -1042,7 +1048,7 @@ int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_
   if (len == 0 && s->obj_size != 0) {
     return 0;
   }
-  if (m_parquet_type) {
+  if (m_parquet_type) {//bufferlist sendback upon range-request
     return parquet_processing(bl,ofs,len);
   }
   if (m_json_type) {