]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
json integration: identifying new data-source and execute json flow. 49411/head
authorgalsalomon66 <gal.salomon@gmail.com>
Tue, 13 Dec 2022 14:31:43 +0000 (16:31 +0200)
committergalsalomon66 <gal.salomon@gmail.com>
Thu, 2 Mar 2023 22:24:37 +0000 (00:24 +0200)
s3select submodule

integrate the limit-operator into RGW. current changes is for CSV format

integrate the limit-operator with CSV-flow, and JSON-flow. an update of s3select submodule

a fix for the input-serialization-type selection

debug functionality.

fix for error handling

adding the scan-range feature. i.e. enables the user to define the range of processing(text only)

remove the temp variable(the s3select-layer handles the sql-result setting upon JSON flow)

adding documentation. for JSON, SQL limit-operator, scan-range option.
a fix for JSON test framework in s3select module

investigate crash. replace len with it.length()

editorial changes; non-initialized variables

adding validation for offset&length. (it seems that the offset&length causes Invalid reads, in some cases)

changes related to trino integration.(1) syntax issues related to trino-engine statement conbersions(s3select parser handles that) (2).
(2). trino rejects(HIVE_CURSUR_ERROR) some of the s3select responses do to its size, the change is about control the size of the respond.

update JSON documentation

trino changes: response in paging (limitation on size), trino syntax issues.

Signed-off-by: galsalomon66 <gal.salomon@gmail.com>
doc/radosgw/s3select.rst
src/rgw/rgw_s3select.cc
src/rgw/rgw_s3select_private.h
src/s3select

index edad5e88e678d87f41f870d19bc53312eb6bafcc..d4319ac7aa757e36fb2c6d22ad3e2f4333a7cfb5 100644 (file)
@@ -384,6 +384,12 @@ String functions
 
     | ``upper\lower`` : converts characters into lowercase/uppercase.
 
+SQL limit operator
+~~~~~~~~~~~~~~~~~~
+
+    | The SQL LIMIT operator is used to limit the number of rows processed by the query.
+    | Upon reaching the limit set by the user, the RGW stops fetching additional chunks.
+    | TODO : add examples, for aggregation and non-aggregation queries.
 
 Alias
 ~~~~~
@@ -442,7 +448,8 @@ Sending Query to RGW
 
  aws --endpoint-url http://localhost:8000 s3api select-object-content 
   --bucket {BUCKET-NAME}  
-  --expression-type 'SQL'     
+  --expression-type 'SQL'
+  --scan-range '{"Start" : 1000, "End" : 1000000}' 
   --input-serialization 
   '{"CSV": {"FieldDelimiter": "," , "QuoteCharacter": "\"" , "RecordDelimiter" : "\n" , "QuoteEscapeCharacter" : "\\" , "FileHeaderInfo": "USE" }, "CompressionType": "NONE"}' 
   --output-serialization '{"CSV": {"FieldDelimiter": ":", "RecordDelimiter":"\t", "QuoteFields": "ALWAYS"}}' 
@@ -496,6 +503,13 @@ Output Serialization
    | **FieldDelimiter** -> (string)
    | The value used to separate individual fields in a record. You can specify an arbitrary delimiter.
 
+scan range option
+~~~~~~~~~~~~~~~~~
+
+   | The scan range option is a part of AWS-CLI syntax, it enables to scan and process only the selected part of the object. 
+   | This option reduces the amount of IO operations (by skipping).
+   | TODO : different data-sources (CSV, JSON, Parquet)
+
 CSV parsing behavior
 --------------------
 
@@ -530,6 +544,94 @@ CSV parsing behavior
 |                                 | tag             | "**IGNORE**" value means to skip the first line                       |
 +---------------------------------+-----------------+-----------------------------------------------------------------------+       
 
+JSON
+--------------------
+
+         | a JSON reader has been integrated with the s3select-engine, which allows the client to use SQL statements to scan and extract information from JSON documents. 
+         | It should be noted that the data readers and parsers for CSV, Parquet, and JSON documents are separated from the SQL engine itself, so all of these readers use the same SQL engine.
+
+         | It's important to note that values in a JSON document can be nested in various ways, such as within objects or arrays.
+         | These objects and arrays can be nested within each other without any limitations.
+         | upon using SQL to query a specific value in a JSON document, the user needs to use a specific syntax to describe the location of the value.
+         | This is because the standard "select column from object" syntax will not work.
+         | Instead, the user must use a path in the SELECT statement to tell the JSON reader where the value is located.
+
+         | The SQL engine processes the SELECT statement in a row-based fashion.
+         | It uses the columns specified in the statement to perform its projection calculation, and each row contains values for these columns.
+         | In other words, the SQL engine processes each row one at a time(and aggregates results), using the values in the columns to perform its SQL calculations.
+         | However, the generic structure of a JSON document does not have a row-and-column structure like CSV or Parquet.
+         | Instead, it is the SQL statement itself that defines the rows and columns when querying a JSON document.
+
+         | Upon querying JSON documents using SQL, the FROM clause in the SELECT statement defines the row boundaries.
+         | a row in a JSON document should be similar to how the row delimiter is used to define rows when querying CSV objects, and how row groups are used to define rows when querying Parquet objects.
+         | The statement "SELECT ... FROM s3object[*].aaa.bb.cc" instructs the reader to search for the path "aaa.bb.cc" and defines the row boundaries based on the occurrence of this path.
+         | A row begins when the reader encounters the path, and it ends when the reader exits the innermost part of the path, which in this case is the object "cc".
+
+         | NOTE : The semantics of querying JSON document may change and may not be the same as the current methodology described.
+
+         | TODO : relevant example for object and array values.
+
+a JSON query example
+--------------------
+
+::
+
+ {
+  "firstName": "Joe",
+  "lastName": "Jackson",
+  "gender": "male",
+  "age": "twenty",
+  "address": {
+  "streetAddress": "101",
+  "city": "San Diego",
+  "state": "CA"
+  },
+
+  "firstName": "Joe_2",
+  "lastName": "Jackson_2",
+  "gender": "male",
+  "age": 21,
+  "address": {
+  "streetAddress": "101",
+  "city": "San Diego",
+  "state": "CA"
+  },
+
+  "phoneNumbers": [
+    { "type": "home1", "number": "734928_1","addr": 11 },
+    { "type": "home2", "number": "734928_2","addr": 22 },
+    { "type": "home3", "number": "734928_3","addr": 33 },
+    { "type": "home4", "number": "734928_4","addr": 44 },
+    { "type": "home5", "number": "734928_5","addr": 55 },
+    { "type": "home6", "number": "734928_6","addr": 66 },
+    { "type": "home7", "number": "734928_7","addr": 77 },
+    { "type": "home8", "number": "734928_8","addr": 88 },
+    { "type": "home9", "number": "734928_9","addr": 99 },
+    { "type": "home10", "number": "734928_10","addr": 100 }
+  ],
+
+  "key_after_array": "XXX",
+
+  "description" : {
+    "main_desc" : "value_1",
+    "second_desc" : "value_2"
+  }
+ }
+
+  # the from-clause define a single row.
+  # _1 points to root object level.
+  # _1.age appears twice in Documnet-row, the last value is used for the operation.  
+  query = "select _1.firstname,_1.key_after_array,_1.age+4,_1.description.main_desc,_1.description.second_desc from s3object[*];";
+  expected_result = Joe_2,XXX,25,value_1,value_2
+
+
+  # the from-clause points the phonenumbers array (it defines the _1)
+  # each element in phoneNumbers array define a row. 
+  # in this case each element is an object contains 3 keys/values.
+  # the query "can not access" values outside phonenumbers array, the query can access only values appears on _1.phonenumbers path.
+  query = "select cast(substring(_1.number,1,6) as int) *10 from s3object[*].phonenumbers where _1.type='home2';";
+  expected_result = 7349280  
+
 
 BOTO3
 -----
index 14cadb32c6b8841175de9478829bc36e2434d76e..3a11ddf839c6c989b71a4fe0cbdd551fbbfbd9ae 100644 (file)
@@ -36,7 +36,7 @@ uint64_t aws_response_handler::get_total_bytes_returned()
 
 void aws_response_handler::update_total_bytes_returned(uint64_t value)
 {
-  total_bytes_returned += value;
+  total_bytes_returned = value;
 }
 
 void aws_response_handler::push_header(const char* header_name, const char* header_value)
@@ -260,7 +260,12 @@ void aws_response_handler::send_stats_response()
 
 RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3():
   m_buff_header(std::make_unique<char[]>(1000)),
+  m_scan_range_ind(false),
+  m_start_scan_sz(0),
+  m_end_scan_sz(0),
+  m_object_size_for_processing(0),
   m_parquet_type(false),
+  m_json_type(false),
   chunk_number(0)
 {
   set_get_data(true);
@@ -282,9 +287,27 @@ RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3():
     return 0;
   };
   fp_s3select_result_format = [this](std::string& result) {
+    fp_chunked_transfer_encoding();
     m_aws_response_handler.send_success_response();
     return 0;
   };
+
+  fp_debug_mesg = [&](const char* mesg){
+    ldpp_dout(this, 10) << mesg << dendl;
+  };
+
+  fp_chunked_transfer_encoding = [&](void){
+    if (chunk_number == 0) { 
+      if (op_ret < 0) { 
+        set_req_state_err(s, op_ret); 
+      } 
+      dump_errno(s); 
+      // Explicitly use chunked transfer encoding so that we can stream the result
+      // to the user without having to wait for the full length of it.
+      end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING); 
+    } 
+    chunk_number++; 
+  };
 }
 
 RGWSelectObj_ObjStore_S3::~RGWSelectObj_ObjStore_S3()
@@ -295,13 +318,11 @@ int RGWSelectObj_ObjStore_S3::get_params(optional_yield y)
   if(m_s3select_query.empty() == false) {
     return 0;
   }
-  if(s->object->get_name().find(".parquet") != std::string::npos) { //aws cli is missing the parquet
-#ifdef _ARROW_EXIST
-    m_parquet_type = true;
-#else
+#ifndef _ARROW_EXIST
+    m_parquet_type = false;
     ldpp_dout(this, 10) << "arrow library is not installed" << dendl;
 #endif
-  }
+  
   //retrieve s3-select query from payload
   bufferlist data;
   int ret;
@@ -325,7 +346,7 @@ int RGWSelectObj_ObjStore_S3::get_params(optional_yield y)
   return RGWGetObj_ObjStore_S3::get_params(y);
 }
 
-int RGWSelectObj_ObjStore_S3::run_s3select(const char* query, const char* input, size_t input_length)
+int RGWSelectObj_ObjStore_S3::run_s3select_on_csv(const char* query, const char* input, size_t input_length)
 {
   int status = 0;
   uint32_t length_before_processing, length_post_processing;
@@ -374,25 +395,28 @@ int RGWSelectObj_ObjStore_S3::run_s3select(const char* query, const char* input,
   } else if(m_header_info.compare("USE")==0) {
     csv.use_header_info=true;
   }
+  //m_s3_csv_object.set_external_debug_system(fp_debug_mesg);
+  m_s3_csv_object.set_result_formatters(fp_s3select_result_format,fp_result_header_format);
   m_s3_csv_object.set_csv_query(&s3select_syntax, csv);
-  m_aws_response_handler.init_response();
   if (s3select_syntax.get_error_description().empty() == false) {
     //error-flow (syntax-error)
     m_aws_response_handler.send_error_response(s3select_syntax_error,
         s3select_syntax.get_error_description().c_str(),
         s3select_resource_id);
-    ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax.get_error_description() << "}" << dendl;
+    ldpp_dout(this, 10) << "s3-select query: failed to prase the following query {" << query << "}" << dendl;
+    ldpp_dout(this, 10) << "s3-select query: syntax-error {" << s3select_syntax.get_error_description() << "}" << dendl;
     return -1;
   } else {
     if (input == nullptr) {
       input = "";
     }
-    m_aws_response_handler.init_success_response();
-    length_before_processing = (m_aws_response_handler.get_sql_result()).size();
+    fp_result_header_format(m_aws_response_handler.get_sql_result());
+    length_before_processing = m_s3_csv_object.get_return_result_size();
     //query is correct(syntax), processing is starting.
-    status = m_s3_csv_object.run_s3select_on_stream(m_aws_response_handler.get_sql_result(), input, input_length, s->obj_size);
-    length_post_processing = (m_aws_response_handler.get_sql_result()).size();
-    m_aws_response_handler.update_total_bytes_returned(length_post_processing-length_before_processing);
+    status = m_s3_csv_object.run_s3select_on_stream(m_aws_response_handler.get_sql_result(), input, input_length, m_object_size_for_processing);
+    length_post_processing = m_s3_csv_object.get_return_result_size();
+    m_aws_response_handler.update_total_bytes_returned( m_s3_csv_object.get_return_result_size() );
+
     if (status < 0) {
       //error flow(processing-time)
       m_aws_response_handler.send_error_response(s3select_processTime_error,
@@ -401,24 +425,16 @@ int RGWSelectObj_ObjStore_S3::run_s3select(const char* query, const char* input,
       ldpp_dout(this, 10) << "s3-select query: failed to process query; {" << m_s3_csv_object.get_error_description() << "}" << dendl;
       return -1;
     }
-    if (chunk_number == 0) {
-      //success flow
-      if (op_ret < 0) {
-        set_req_state_err(s, op_ret);
-      }
-      dump_errno(s);
-      // Explicitly use chunked transfer encoding so that we can stream the result
-      // to the user without having to wait for the full length of it.
-      end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING);
-    }
-    chunk_number++;
+
   }
-  if (length_post_processing-length_before_processing != 0) {
-    m_aws_response_handler.send_success_response();
+  if ((length_post_processing-length_before_processing) != 0) {
+    ldpp_dout(this, 10) << "s3-select: sql-result-size = " << m_aws_response_handler.get_sql_result().size() << dendl;
+    ldpp_dout(this, 10) << "s3-select: sql-result{" << m_aws_response_handler.get_sql_result() << "}" << dendl;
   } else {
     m_aws_response_handler.send_continuation_response();
   }
   if (enable_progress == true) {
+    fp_chunked_transfer_encoding();
     m_aws_response_handler.init_progress_response();
     m_aws_response_handler.send_progress_response();
   }
@@ -430,8 +446,11 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_parquet(const char* query)
   int status = 0;
 #ifdef _ARROW_EXIST
   if (!m_s3_parquet_object.is_set()) {
+    //parsing the SQL statement
     s3select_syntax.parse_query(m_sql_query.c_str());
+    //m_s3_parquet_object.set_external_debug_system(fp_debug_mesg);
     try {
+      //at this stage the Parquet-processing requires for the meta-data that reside on Parquet object 
       m_s3_parquet_object.set_parquet_object(std::string("s3object"), &s3select_syntax, &m_rgw_api);
     } catch(base_s3select_exception& e) {
       ldpp_dout(this, 10) << "S3select: failed upon parquet-reader construction: " << e.what() << dendl;
@@ -442,6 +461,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_parquet(const char* query)
     }
   }
   if (s3select_syntax.get_error_description().empty() == false) {
+    //the SQL statement failed the syntax parser
     fp_result_header_format(m_aws_response_handler.get_sql_result());
     m_aws_response_handler.get_sql_result().append(s3select_syntax.get_error_description().data());
     fp_s3select_result_format(m_aws_response_handler.get_sql_result());
@@ -449,6 +469,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_parquet(const char* query)
     status = -1;
   } else {
     fp_result_header_format(m_aws_response_handler.get_sql_result());
+    //at this stage the Parquet-processing "takes control", it keep calling to s3-range-request according to the SQL statement.
     status = m_s3_parquet_object.run_s3select_on_object(m_aws_response_handler.get_sql_result(), fp_s3select_result_format, fp_result_header_format);
     if (status < 0) {
       m_aws_response_handler.get_sql_result().append(m_s3_parquet_object.get_error_description());
@@ -460,22 +481,114 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_parquet(const char* query)
   return status;
 }
 
+int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char* input, size_t input_length)
+{
+  int status = 0;
+  
+  const char* s3select_processTime_error = "s3select-ProcessingTime-Error";
+  const char* s3select_syntax_error = "s3select-Syntax-Error";
+  const char* s3select_resource_id = "resourcse-id";
+  const char* s3select_json_error = "json-Format-Error";
+
+  m_aws_response_handler.init_response();
+
+  //the JSON data-type should be(currently) only DOCUMENT
+  if (m_json_datatype.compare("DOCUMENT") != 0) {
+    const char* s3select_json_error_msg = "s3-select query: wrong json dataType should use DOCUMENT; ";
+    m_aws_response_handler.send_error_response(s3select_json_error,
+      s3select_json_error_msg,
+      s3select_resource_id);
+    ldpp_dout(this, 10) << s3select_json_error_msg << dendl;
+    return -EINVAL;
+  } 
+
+  //parsing the SQL statement
+  s3select_syntax.parse_query(m_sql_query.c_str());
+  if (s3select_syntax.get_error_description().empty() == false) {
+  //SQL statement is wrong(syntax).
+    m_aws_response_handler.send_error_response(s3select_syntax_error,
+      s3select_syntax.get_error_description().c_str(),
+      s3select_resource_id);
+    ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax.get_error_description() << "}" << dendl;
+    return -EINVAL;
+  }
+    
+  //initializing json processor
+  m_s3_json_object.set_json_query(&s3select_syntax);
+
+  if (input == nullptr) {
+    input = "";
+  }
+  m_aws_response_handler.init_success_response();
+  uint32_t length_before_processing = m_aws_response_handler.get_sql_result().size();
+  //query is correct(syntax), processing is starting.
+  try {
+    status = m_s3_json_object.run_s3select_on_stream(m_aws_response_handler.get_sql_result(), input, input_length, m_object_size_for_processing);
+  } catch(base_s3select_exception& e) {
+    ldpp_dout(this, 10) << "S3select: failed to process JSON object: " << e.what() << dendl;
+    m_aws_response_handler.get_sql_result().append(e.what());
+    m_aws_response_handler.send_error_response(s3select_processTime_error,
+       e.what(),
+       s3select_resource_id);
+    return -EINVAL;
+  }
+  uint32_t length_post_processing = m_aws_response_handler.get_sql_result().size();
+  m_aws_response_handler.update_total_bytes_returned(length_post_processing - length_before_processing);
+  if (status < 0) {
+    //error flow(processing-time)
+    m_aws_response_handler.send_error_response(s3select_processTime_error,
+       m_s3_json_object.get_error_description().c_str(),
+       s3select_resource_id);
+    ldpp_dout(this, 10) << "s3-select query: failed to process query; {" << m_s3_json_object.get_error_description() << "}" << dendl;
+    return -EINVAL;
+  }
+  fp_chunked_transfer_encoding();
+
+  if (length_post_processing-length_before_processing != 0) {
+    m_aws_response_handler.send_success_response();
+  } else {
+    m_aws_response_handler.send_continuation_response();
+  }
+  if (enable_progress == true) {
+    m_aws_response_handler.init_progress_response();
+    m_aws_response_handler.send_progress_response();
+  }
+
+  return status;
+}
+
 int RGWSelectObj_ObjStore_S3::handle_aws_cli_parameters(std::string& sql_query)
 {
   std::string input_tag{"InputSerialization"};
   std::string output_tag{"OutputSerialization"};
-  if(chunk_number !=0) {
+  if (chunk_number !=0) {
     return 0;
   }
 #define GT "&gt;"
 #define LT "&lt;"
+#define APOS "&apos;"
+
   if (m_s3select_query.find(GT) != std::string::npos) {
     boost::replace_all(m_s3select_query, GT, ">");
   }
   if (m_s3select_query.find(LT) != std::string::npos) {
     boost::replace_all(m_s3select_query, LT, "<");
   }
+  if (m_s3select_query.find(APOS) != std::string::npos) {
+    boost::replace_all(m_s3select_query, APOS, "'");
+  }
   //AWS cli s3select parameters
+  if (m_s3select_query.find(input_tag+"><CSV") != std::string::npos) {
+    ldpp_dout(this, 10) << "s3select: engine is set to process CSV objects" << dendl;
+  }
+  else if (m_s3select_query.find(input_tag+"><JSON") != std::string::npos) {
+    m_json_type=true;
+    ldpp_dout(this, 10) << "s3select: engine is set to process JSON objects" << dendl;
+  } else if (m_s3select_query.find(input_tag+"><Parquet") != std::string::npos) {
+    m_parquet_type=true;
+    ldpp_dout(this, 10) << "s3select: engine is set to process Parquet objects" << dendl;
+  }
+
   extract_by_tag(m_s3select_query, "Expression", sql_query);
   extract_by_tag(m_s3select_query, "Enabled", m_enable_progress);
   size_t _qi = m_s3select_query.find("<" + input_tag + ">", 0);
@@ -485,9 +598,10 @@ int RGWSelectObj_ObjStore_S3::handle_aws_cli_parameters(std::string& sql_query)
   extract_by_tag(m_s3select_input, "QuoteCharacter", m_quot);
   extract_by_tag(m_s3select_input, "RecordDelimiter", m_row_delimiter);
   extract_by_tag(m_s3select_input, "FileHeaderInfo", m_header_info);
+  extract_by_tag(m_s3select_input, "Type", m_json_datatype);
   if (m_row_delimiter.size()==0) {
     m_row_delimiter='\n';
-  } else if(m_row_delimiter.compare("&#10;") == 0) {
+  } else if (m_row_delimiter.compare("&#10;") == 0) {
     //presto change
     m_row_delimiter='\n';
   }
@@ -503,7 +617,7 @@ int RGWSelectObj_ObjStore_S3::handle_aws_cli_parameters(std::string& sql_query)
   extract_by_tag(m_s3select_output, "RecordDelimiter", output_row_delimiter);
   if (output_row_delimiter.size()==0) {
     output_row_delimiter='\n';
-  } else if(output_row_delimiter.compare("&#10;") == 0) {
+  } else if (output_row_delimiter.compare("&#10;") == 0) {
     //presto change
     output_row_delimiter='\n';
   }
@@ -511,6 +625,19 @@ int RGWSelectObj_ObjStore_S3::handle_aws_cli_parameters(std::string& sql_query)
     ldpp_dout(this, 10) << "RGW supports currently only NONE option for compression type" << dendl;
     return -1;
   }
+  extract_by_tag(m_s3select_query, "Start", m_start_scan);
+  extract_by_tag(m_s3select_query, "End", m_end_scan);
+  if (m_start_scan.size() || m_end_scan.size()) {
+    m_scan_range_ind = true;
+    if (m_start_scan.size()) {
+      m_start_scan_sz = std::stol(m_start_scan);
+    }
+    if (m_end_scan.size()) {
+      m_end_scan_sz = std::stol(m_end_scan);
+    } else {
+      m_end_scan_sz = std::numeric_limits<std::int64_t>::max();
+    } 
+  }
   return 0;
 }
 
@@ -547,7 +674,9 @@ int RGWSelectObj_ObjStore_S3::range_request(int64_t ofs, int64_t len, void* buff
   m_request_range = len;
   ldout(s->cct, 10) << "S3select: calling execute(async):" << " request-offset :" << ofs << " request-length :" << len << " buffer size : " << requested_buffer.size() << dendl;
   RGWGetObj::execute(y);
-  memcpy(buff, requested_buffer.data(), len);
+  if (buff) {
+    memcpy(buff, requested_buffer.data(), len);
+  }
   ldout(s->cct, 10) << "S3select: done waiting, buffer is complete buffer-size:" << requested_buffer.size() << dendl;
   return len;
 }
@@ -565,7 +694,7 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y)
   if (m_parquet_type) {
     //parquet processing
     range_request(0, 4, parquet_magic, y);
-    if(memcmp(parquet_magic, parquet_magic1, 4) && memcmp(parquet_magic, parquet_magicE, 4)) {
+    if (memcmp(parquet_magic, parquet_magic1, 4) && memcmp(parquet_magic, parquet_magicE, 4)) {
       ldout(s->cct, 10) << s->object->get_name() << " does not contain parquet magic" << dendl;
       op_ret = -ERR_INVALID_REQUEST;
       return;
@@ -578,31 +707,25 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y)
     } else {
       ldout(s->cct, 10) << "S3select: complete query with success " << dendl;
     }
-  } else {
-    //CSV processing
-    RGWGetObj::execute(y);
-  }
+    } else { 
+       //CSV or JSON processing
+       if (m_scan_range_ind) {
+         //scan-range
+         range_request(m_start_scan_sz, m_end_scan_sz - m_start_scan_sz, nullptr, y);
+       } else {
+         RGWGetObj::execute(y);
+       }
+  }//if (m_parquet_type)
 }
 
 int RGWSelectObj_ObjStore_S3::parquet_processing(bufferlist& bl, off_t ofs, off_t len)
 {
-    if (chunk_number == 0) {
-      if (op_ret < 0) {
-        set_req_state_err(s, op_ret);
-      }
-      dump_errno(s);
-    }
-    // Explicitly use chunked transfer encoding so that we can stream the result
-    // to the user without having to wait for the full length of it.
-    if (chunk_number == 0) {
-      end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING);
-    }
-    chunk_number++;
+    fp_chunked_transfer_encoding();
     size_t append_in_callback = 0;
     int part_no = 1;
     //concat the requested buffer
     for (auto& it : bl.buffers()) {
-      if(it.length() == 0) {
+      if (it.length() == 0) {
         ldout(s->cct, 10) << "S3select: get zero-buffer while appending request-buffer " << dendl;
       }
       append_in_callback += it.length();
@@ -623,49 +746,134 @@ int RGWSelectObj_ObjStore_S3::parquet_processing(bufferlist& bl, off_t ofs, off_
 int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t len)
 {
   int status = 0;
-  
-  if (s->obj_size == 0) {
-    status = run_s3select(m_sql_query.c_str(), nullptr, 0);
+  if (s->obj_size == 0 || m_object_size_for_processing == 0) {
+    status = run_s3select_on_csv(m_sql_query.c_str(), nullptr, 0);
+    if (status<0){
+      return -EINVAL;
+    }
   } else {
     auto bl_len = bl.get_num_buffers();
     int i=0;
     for(auto& it : bl.buffers()) {
       ldpp_dout(this, 10) << "processing segment " << i << " out of " << bl_len << " off " << ofs
-                          << " len " << len << " obj-size " << s->obj_size << dendl;
-      if(it.length() == 0) {
+                          << " len " << len << " obj-size " << m_object_size_for_processing << dendl;
+      if (it.length() == 0 || len == 0) {
         ldpp_dout(this, 10) << "s3select:it->_len is zero. segment " << i << " out of " << bl_len
-                            <<  " obj-size " << s->obj_size << dendl;
+                            <<  " obj-size " << m_object_size_for_processing << dendl;
         continue;
       }
+      //NOTE: the it.length() must be used (not len)
       m_aws_response_handler.update_processed_size(it.length());
-      status = run_s3select(m_sql_query.c_str(), &(it)[0], it.length());
-      if(status<0) {
+
+      if((ofs + len) > it.length()){
+       ldpp_dout(this, 10) << "offset and lenghth may cause invalid read: ofs = " << ofs << " len = " << len << " it.length() = " << it.length() << dendl;
+       ofs = 0;
+       len = it.length();
+      }
+      status = run_s3select_on_csv(m_sql_query.c_str(), &(it)[0] +ofs, len);
+      if (status<0) {
+       return -EINVAL;
+      }
+      if (m_s3_csv_object.is_sql_limit_reached()) {
+       break;
+      }
+      i++;
+    }
+  }
+  if (m_aws_response_handler.get_processed_size() == uint64_t(m_object_size_for_processing) || m_s3_csv_object.is_sql_limit_reached()) {
+    if (status >=0) {
+      m_aws_response_handler.init_stats_response();
+      m_aws_response_handler.send_stats_response();
+      m_aws_response_handler.init_end_response();
+    }
+    if (m_s3_csv_object.is_sql_limit_reached()) {
+    //stop fetching chunks
+    ldpp_dout(this, 10) << "s3select : reached the limit :" << m_aws_response_handler.get_processed_size()  << dendl;
+    status = -ENOENT;
+    }
+  }
+  return status;
+}
+
+int RGWSelectObj_ObjStore_S3::json_processing(bufferlist& bl, off_t ofs, off_t len)
+{
+  int status = 0;
+  
+  if (s->obj_size == 0 || m_object_size_for_processing == 0) {
+    //in case of empty object the s3select-function returns a correct "empty" result(for aggregation and non-aggregation queries).
+    status = run_s3select_on_json(m_sql_query.c_str(), nullptr, 0);
+    if (status<0)
+      return -EINVAL;
+  } else {
+    //loop on buffer-list(chunks)
+    auto bl_len = bl.get_num_buffers();
+    int i=0;
+    for(auto& it : bl.buffers()) {
+      ldpp_dout(this, 10) << "processing segment " << i << " out of " << bl_len << " off " << ofs
+                          << " len " << len << " obj-size " << m_object_size_for_processing << dendl;
+      //skipping the empty chunks
+      if (len == 0) {
+        ldpp_dout(this, 10) << "s3select:it->_len is zero. segment " << i << " out of " << bl_len
+                            <<  " obj-size " << m_object_size_for_processing << dendl;
+        continue;
+      }
+      m_aws_response_handler.update_processed_size(len);
+      status = run_s3select_on_json(m_sql_query.c_str(), &(it)[0], len);
+      if (status<0) {
+       status = -EINVAL;
         break;
       }
+      if (m_s3_json_object.is_sql_limit_reached()) {
+       break;
+      }
       i++;
     }
   }
-  if (m_aws_response_handler.get_processed_size() == s->obj_size) {
+
+  if (status>=0 && (m_aws_response_handler.get_processed_size() == uint64_t(m_object_size_for_processing) || m_s3_json_object.is_sql_limit_reached())) {
+    //flush the internal JSON buffer(upon last chunk)
+    status = run_s3select_on_json(m_sql_query.c_str(), nullptr, 0);
+    if (status<0) {
+      return -EINVAL;
+    }
     if (status >=0) {
       m_aws_response_handler.init_stats_response();
       m_aws_response_handler.send_stats_response();
       m_aws_response_handler.init_end_response();
     }
+    if (m_s3_json_object.is_sql_limit_reached()){
+      //stop fetching chunks
+      status = -ENOENT;
+      ldpp_dout(this, 10) << "s3select : reached the limit :" << m_aws_response_handler.get_processed_size()  << dendl;
+    }
   }
   return status;
 }
 
 int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_t len)
 {
+  if (m_scan_range_ind == false){
+    m_object_size_for_processing = s->obj_size;
+  }
+  if (m_scan_range_ind == true){
+      if (m_end_scan_sz == -1){
+               m_end_scan_sz = s->obj_size;
+      }
+      m_object_size_for_processing = m_end_scan_sz - m_start_scan_sz;
+  }
   if (!m_aws_response_handler.is_set()) {
     m_aws_response_handler.set(s, this);
   }
-  if(len == 0 && s->obj_size != 0) {
+  if (len == 0 && s->obj_size != 0) {
     return 0;
   }
   if (m_parquet_type) {
     return parquet_processing(bl,ofs,len);
   }
+  if (m_json_type) {
+    return json_processing(bl,ofs,len);
+  }
   return csv_processing(bl,ofs,len);
 }
 
index eb17050bc153974540fd236c3fe158debee0002a..7ac9167caee4fa0f75c9236164f2329788f637bd 100644 (file)
@@ -169,6 +169,7 @@ private:
 #ifdef _ARROW_EXIST
   s3selectEngine::parquet_object m_s3_parquet_object;
 #endif
+  s3selectEngine::json_object m_s3_json_object;
   std::string m_column_delimiter;
   std::string m_quot;
   std::string m_row_delimiter;
@@ -183,11 +184,20 @@ private:
   std::string output_escape_char;
   std::string output_quote_fields;
   std::string output_row_delimiter;
+  std::string m_start_scan;
+  std::string m_end_scan;
+  bool m_scan_range_ind;
+  int64_t m_start_scan_sz;
+  int64_t m_end_scan_sz;
+  int64_t m_object_size_for_processing;
   aws_response_handler m_aws_response_handler;
   bool enable_progress;
 
   //parquet request
   bool m_parquet_type;
+  //json request
+  std::string m_json_datatype;
+  bool m_json_type;
 #ifdef _ARROW_EXIST
   s3selectEngine::rgw_s3select_api m_rgw_api;
 #endif
@@ -197,6 +207,8 @@ private:
   std::string range_req_str;
   std::function<int(std::string&)> fp_result_header_format;
   std::function<int(std::string&)> fp_s3select_result_format;
+  std::function<void(const char*)> fp_debug_mesg;
+  std::function<void(void)> fp_chunked_transfer_encoding;
   int m_header_size;
 
 public:
@@ -217,10 +229,14 @@ private:
 
   int parquet_processing(bufferlist& bl, off_t ofs, off_t len);
 
-  int run_s3select(const char* query, const char* input, size_t input_length);
+  int json_processing(bufferlist& bl, off_t ofs, off_t len);
+
+  int run_s3select_on_csv(const char* query, const char* input, size_t input_length);
 
   int run_s3select_on_parquet(const char* query);
 
+  int run_s3select_on_json(const char* query, const char* input, size_t input_length);
+
   int extract_by_tag(std::string input, std::string tag_name, std::string& result);
 
   void convert_escape_seq(std::string& esc);
index 757184fdcb857612ce83707119c5960f3a3af5b5..874752f9cc931b2b0af52e96bcc147e0239faa14 160000 (submodule)
@@ -1 +1 @@
-Subproject commit 757184fdcb857612ce83707119c5960f3a3af5b5
+Subproject commit 874752f9cc931b2b0af52e96bcc147e0239faa14