| ``upper\lower`` : converts characters into lowercase/uppercase.
+SQL limit operator
+~~~~~~~~~~~~~~~~~~
+
+ | The SQL LIMIT operator is used to limit the number of rows processed by the query.
+ | Upon reaching the limit set by the user, the RGW stops fetching additional chunks.
+ | TODO : add examples, for aggregation and non-aggregation queries.
Alias
~~~~~
aws --endpoint-url http://localhost:8000 s3api select-object-content
--bucket {BUCKET-NAME}
- --expression-type 'SQL'
+ --expression-type 'SQL'
+ --scan-range '{"Start" : 1000, "End" : 1000000}'
--input-serialization
'{"CSV": {"FieldDelimiter": "," , "QuoteCharacter": "\"" , "RecordDelimiter" : "\n" , "QuoteEscapeCharacter" : "\\" , "FileHeaderInfo": "USE" }, "CompressionType": "NONE"}'
--output-serialization '{"CSV": {"FieldDelimiter": ":", "RecordDelimiter":"\t", "QuoteFields": "ALWAYS"}}'
| **FieldDelimiter** -> (string)
| The value used to separate individual fields in a record. You can specify an arbitrary delimiter.
+scan range option
+~~~~~~~~~~~~~~~~~
+
+ | The scan range option is a part of AWS-CLI syntax, it enables to scan and process only the selected part of the object.
+ | This option reduces the amount of IO operations (by skipping).
+ | TODO : different data-sources (CSV, JSON, Parquet)
+
CSV parsing behavior
--------------------
| | tag | "**IGNORE**" value means to skip the first line |
+---------------------------------+-----------------+-----------------------------------------------------------------------+
+JSON
+--------------------
+
+ | a JSON reader has been integrated with the s3select-engine, which allows the client to use SQL statements to scan and extract information from JSON documents.
+ | It should be noted that the data readers and parsers for CSV, Parquet, and JSON documents are separated from the SQL engine itself, so all of these readers use the same SQL engine.
+
+ | It's important to note that values in a JSON document can be nested in various ways, such as within objects or arrays.
+ | These objects and arrays can be nested within each other without any limitations.
+ | upon using SQL to query a specific value in a JSON document, the user needs to use a specific syntax to describe the location of the value.
+ | This is because the standard "select column from object" syntax will not work.
+ | Instead, the user must use a path in the SELECT statement to tell the JSON reader where the value is located.
+
+ | The SQL engine processes the SELECT statement in a row-based fashion.
+ | It uses the columns specified in the statement to perform its projection calculation, and each row contains values for these columns.
+ | In other words, the SQL engine processes each row one at a time(and aggregates results), using the values in the columns to perform its SQL calculations.
+ | However, the generic structure of a JSON document does not have a row-and-column structure like CSV or Parquet.
+ | Instead, it is the SQL statement itself that defines the rows and columns when querying a JSON document.
+
+ | Upon querying JSON documents using SQL, the FROM clause in the SELECT statement defines the row boundaries.
+ | a row in a JSON document should be similar to how the row delimiter is used to define rows when querying CSV objects, and how row groups are used to define rows when querying Parquet objects.
+ | The statement "SELECT ... FROM s3object[*].aaa.bb.cc" instructs the reader to search for the path "aaa.bb.cc" and defines the row boundaries based on the occurrence of this path.
+ | A row begins when the reader encounters the path, and it ends when the reader exits the innermost part of the path, which in this case is the object "cc".
+
+ | NOTE : The semantics of querying JSON document may change and may not be the same as the current methodology described.
+
+ | TODO : relevant example for object and array values.
+
+a JSON query example
+--------------------
+
+::
+
+ {
+ "firstName": "Joe",
+ "lastName": "Jackson",
+ "gender": "male",
+ "age": "twenty",
+ "address": {
+ "streetAddress": "101",
+ "city": "San Diego",
+ "state": "CA"
+ },
+
+ "firstName": "Joe_2",
+ "lastName": "Jackson_2",
+ "gender": "male",
+ "age": 21,
+ "address": {
+ "streetAddress": "101",
+ "city": "San Diego",
+ "state": "CA"
+ },
+
+ "phoneNumbers": [
+ { "type": "home1", "number": "734928_1","addr": 11 },
+ { "type": "home2", "number": "734928_2","addr": 22 },
+ { "type": "home3", "number": "734928_3","addr": 33 },
+ { "type": "home4", "number": "734928_4","addr": 44 },
+ { "type": "home5", "number": "734928_5","addr": 55 },
+ { "type": "home6", "number": "734928_6","addr": 66 },
+ { "type": "home7", "number": "734928_7","addr": 77 },
+ { "type": "home8", "number": "734928_8","addr": 88 },
+ { "type": "home9", "number": "734928_9","addr": 99 },
+ { "type": "home10", "number": "734928_10","addr": 100 }
+ ],
+
+ "key_after_array": "XXX",
+
+ "description" : {
+ "main_desc" : "value_1",
+ "second_desc" : "value_2"
+ }
+ }
+
+ # the from-clause define a single row.
+ # _1 points to root object level.
+ # _1.age appears twice in Documnet-row, the last value is used for the operation.
+ query = "select _1.firstname,_1.key_after_array,_1.age+4,_1.description.main_desc,_1.description.second_desc from s3object[*];";
+ expected_result = Joe_2,XXX,25,value_1,value_2
+
+
+ # the from-clause points the phonenumbers array (it defines the _1)
+ # each element in phoneNumbers array define a row.
+ # in this case each element is an object contains 3 keys/values.
+ # the query "can not access" values outside phonenumbers array, the query can access only values appears on _1.phonenumbers path.
+ query = "select cast(substring(_1.number,1,6) as int) *10 from s3object[*].phonenumbers where _1.type='home2';";
+ expected_result = 7349280
+
BOTO3
-----
void aws_response_handler::update_total_bytes_returned(uint64_t value)
{
- total_bytes_returned += value;
+ total_bytes_returned = value;
}
void aws_response_handler::push_header(const char* header_name, const char* header_value)
RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3():
m_buff_header(std::make_unique<char[]>(1000)),
+ m_scan_range_ind(false),
+ m_start_scan_sz(0),
+ m_end_scan_sz(0),
+ m_object_size_for_processing(0),
m_parquet_type(false),
+ m_json_type(false),
chunk_number(0)
{
set_get_data(true);
return 0;
};
fp_s3select_result_format = [this](std::string& result) {
+ fp_chunked_transfer_encoding();
m_aws_response_handler.send_success_response();
return 0;
};
+
+ fp_debug_mesg = [&](const char* mesg){
+ ldpp_dout(this, 10) << mesg << dendl;
+ };
+
+ fp_chunked_transfer_encoding = [&](void){
+ if (chunk_number == 0) {
+ if (op_ret < 0) {
+ set_req_state_err(s, op_ret);
+ }
+ dump_errno(s);
+ // Explicitly use chunked transfer encoding so that we can stream the result
+ // to the user without having to wait for the full length of it.
+ end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING);
+ }
+ chunk_number++;
+ };
}
RGWSelectObj_ObjStore_S3::~RGWSelectObj_ObjStore_S3()
if(m_s3select_query.empty() == false) {
return 0;
}
- if(s->object->get_name().find(".parquet") != std::string::npos) { //aws cli is missing the parquet
-#ifdef _ARROW_EXIST
- m_parquet_type = true;
-#else
+#ifndef _ARROW_EXIST
+ m_parquet_type = false;
ldpp_dout(this, 10) << "arrow library is not installed" << dendl;
#endif
- }
+
//retrieve s3-select query from payload
bufferlist data;
int ret;
return RGWGetObj_ObjStore_S3::get_params(y);
}
-int RGWSelectObj_ObjStore_S3::run_s3select(const char* query, const char* input, size_t input_length)
+int RGWSelectObj_ObjStore_S3::run_s3select_on_csv(const char* query, const char* input, size_t input_length)
{
int status = 0;
uint32_t length_before_processing, length_post_processing;
} else if(m_header_info.compare("USE")==0) {
csv.use_header_info=true;
}
+ //m_s3_csv_object.set_external_debug_system(fp_debug_mesg);
+ m_s3_csv_object.set_result_formatters(fp_s3select_result_format,fp_result_header_format);
m_s3_csv_object.set_csv_query(&s3select_syntax, csv);
- m_aws_response_handler.init_response();
if (s3select_syntax.get_error_description().empty() == false) {
//error-flow (syntax-error)
m_aws_response_handler.send_error_response(s3select_syntax_error,
s3select_syntax.get_error_description().c_str(),
s3select_resource_id);
- ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax.get_error_description() << "}" << dendl;
+ ldpp_dout(this, 10) << "s3-select query: failed to prase the following query {" << query << "}" << dendl;
+ ldpp_dout(this, 10) << "s3-select query: syntax-error {" << s3select_syntax.get_error_description() << "}" << dendl;
return -1;
} else {
if (input == nullptr) {
input = "";
}
- m_aws_response_handler.init_success_response();
- length_before_processing = (m_aws_response_handler.get_sql_result()).size();
+ fp_result_header_format(m_aws_response_handler.get_sql_result());
+ length_before_processing = m_s3_csv_object.get_return_result_size();
//query is correct(syntax), processing is starting.
- status = m_s3_csv_object.run_s3select_on_stream(m_aws_response_handler.get_sql_result(), input, input_length, s->obj_size);
- length_post_processing = (m_aws_response_handler.get_sql_result()).size();
- m_aws_response_handler.update_total_bytes_returned(length_post_processing-length_before_processing);
+ status = m_s3_csv_object.run_s3select_on_stream(m_aws_response_handler.get_sql_result(), input, input_length, m_object_size_for_processing);
+ length_post_processing = m_s3_csv_object.get_return_result_size();
+ m_aws_response_handler.update_total_bytes_returned( m_s3_csv_object.get_return_result_size() );
+
if (status < 0) {
//error flow(processing-time)
m_aws_response_handler.send_error_response(s3select_processTime_error,
ldpp_dout(this, 10) << "s3-select query: failed to process query; {" << m_s3_csv_object.get_error_description() << "}" << dendl;
return -1;
}
- if (chunk_number == 0) {
- //success flow
- if (op_ret < 0) {
- set_req_state_err(s, op_ret);
- }
- dump_errno(s);
- // Explicitly use chunked transfer encoding so that we can stream the result
- // to the user without having to wait for the full length of it.
- end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING);
- }
- chunk_number++;
+
}
- if (length_post_processing-length_before_processing != 0) {
- m_aws_response_handler.send_success_response();
+ if ((length_post_processing-length_before_processing) != 0) {
+ ldpp_dout(this, 10) << "s3-select: sql-result-size = " << m_aws_response_handler.get_sql_result().size() << dendl;
+ ldpp_dout(this, 10) << "s3-select: sql-result{" << m_aws_response_handler.get_sql_result() << "}" << dendl;
} else {
m_aws_response_handler.send_continuation_response();
}
if (enable_progress == true) {
+ fp_chunked_transfer_encoding();
m_aws_response_handler.init_progress_response();
m_aws_response_handler.send_progress_response();
}
int status = 0;
#ifdef _ARROW_EXIST
if (!m_s3_parquet_object.is_set()) {
+ //parsing the SQL statement
s3select_syntax.parse_query(m_sql_query.c_str());
+ //m_s3_parquet_object.set_external_debug_system(fp_debug_mesg);
try {
+ //at this stage the Parquet-processing requires for the meta-data that reside on Parquet object
m_s3_parquet_object.set_parquet_object(std::string("s3object"), &s3select_syntax, &m_rgw_api);
} catch(base_s3select_exception& e) {
ldpp_dout(this, 10) << "S3select: failed upon parquet-reader construction: " << e.what() << dendl;
}
}
if (s3select_syntax.get_error_description().empty() == false) {
+ //the SQL statement failed the syntax parser
fp_result_header_format(m_aws_response_handler.get_sql_result());
m_aws_response_handler.get_sql_result().append(s3select_syntax.get_error_description().data());
fp_s3select_result_format(m_aws_response_handler.get_sql_result());
status = -1;
} else {
fp_result_header_format(m_aws_response_handler.get_sql_result());
+ //at this stage the Parquet-processing "takes control", it keep calling to s3-range-request according to the SQL statement.
status = m_s3_parquet_object.run_s3select_on_object(m_aws_response_handler.get_sql_result(), fp_s3select_result_format, fp_result_header_format);
if (status < 0) {
m_aws_response_handler.get_sql_result().append(m_s3_parquet_object.get_error_description());
return status;
}
+int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char* input, size_t input_length)
+{
+ int status = 0;
+
+ const char* s3select_processTime_error = "s3select-ProcessingTime-Error";
+ const char* s3select_syntax_error = "s3select-Syntax-Error";
+ const char* s3select_resource_id = "resourcse-id";
+ const char* s3select_json_error = "json-Format-Error";
+
+ m_aws_response_handler.init_response();
+
+ //the JSON data-type should be(currently) only DOCUMENT
+ if (m_json_datatype.compare("DOCUMENT") != 0) {
+ const char* s3select_json_error_msg = "s3-select query: wrong json dataType should use DOCUMENT; ";
+ m_aws_response_handler.send_error_response(s3select_json_error,
+ s3select_json_error_msg,
+ s3select_resource_id);
+ ldpp_dout(this, 10) << s3select_json_error_msg << dendl;
+ return -EINVAL;
+ }
+
+ //parsing the SQL statement
+ s3select_syntax.parse_query(m_sql_query.c_str());
+ if (s3select_syntax.get_error_description().empty() == false) {
+ //SQL statement is wrong(syntax).
+ m_aws_response_handler.send_error_response(s3select_syntax_error,
+ s3select_syntax.get_error_description().c_str(),
+ s3select_resource_id);
+ ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax.get_error_description() << "}" << dendl;
+ return -EINVAL;
+ }
+
+ //initializing json processor
+ m_s3_json_object.set_json_query(&s3select_syntax);
+
+ if (input == nullptr) {
+ input = "";
+ }
+ m_aws_response_handler.init_success_response();
+ uint32_t length_before_processing = m_aws_response_handler.get_sql_result().size();
+ //query is correct(syntax), processing is starting.
+ try {
+ status = m_s3_json_object.run_s3select_on_stream(m_aws_response_handler.get_sql_result(), input, input_length, m_object_size_for_processing);
+ } catch(base_s3select_exception& e) {
+ ldpp_dout(this, 10) << "S3select: failed to process JSON object: " << e.what() << dendl;
+ m_aws_response_handler.get_sql_result().append(e.what());
+ m_aws_response_handler.send_error_response(s3select_processTime_error,
+ e.what(),
+ s3select_resource_id);
+ return -EINVAL;
+ }
+ uint32_t length_post_processing = m_aws_response_handler.get_sql_result().size();
+ m_aws_response_handler.update_total_bytes_returned(length_post_processing - length_before_processing);
+ if (status < 0) {
+ //error flow(processing-time)
+ m_aws_response_handler.send_error_response(s3select_processTime_error,
+ m_s3_json_object.get_error_description().c_str(),
+ s3select_resource_id);
+ ldpp_dout(this, 10) << "s3-select query: failed to process query; {" << m_s3_json_object.get_error_description() << "}" << dendl;
+ return -EINVAL;
+ }
+ fp_chunked_transfer_encoding();
+
+ if (length_post_processing-length_before_processing != 0) {
+ m_aws_response_handler.send_success_response();
+ } else {
+ m_aws_response_handler.send_continuation_response();
+ }
+ if (enable_progress == true) {
+ m_aws_response_handler.init_progress_response();
+ m_aws_response_handler.send_progress_response();
+ }
+
+ return status;
+}
+
int RGWSelectObj_ObjStore_S3::handle_aws_cli_parameters(std::string& sql_query)
{
std::string input_tag{"InputSerialization"};
std::string output_tag{"OutputSerialization"};
- if(chunk_number !=0) {
+ if (chunk_number !=0) {
return 0;
}
#define GT ">"
#define LT "<"
+#define APOS "'"
+
if (m_s3select_query.find(GT) != std::string::npos) {
boost::replace_all(m_s3select_query, GT, ">");
}
if (m_s3select_query.find(LT) != std::string::npos) {
boost::replace_all(m_s3select_query, LT, "<");
}
+ if (m_s3select_query.find(APOS) != std::string::npos) {
+ boost::replace_all(m_s3select_query, APOS, "'");
+ }
//AWS cli s3select parameters
+ if (m_s3select_query.find(input_tag+"><CSV") != std::string::npos) {
+ ldpp_dout(this, 10) << "s3select: engine is set to process CSV objects" << dendl;
+ }
+ else if (m_s3select_query.find(input_tag+"><JSON") != std::string::npos) {
+ m_json_type=true;
+ ldpp_dout(this, 10) << "s3select: engine is set to process JSON objects" << dendl;
+ } else if (m_s3select_query.find(input_tag+"><Parquet") != std::string::npos) {
+ m_parquet_type=true;
+ ldpp_dout(this, 10) << "s3select: engine is set to process Parquet objects" << dendl;
+ }
+
extract_by_tag(m_s3select_query, "Expression", sql_query);
extract_by_tag(m_s3select_query, "Enabled", m_enable_progress);
size_t _qi = m_s3select_query.find("<" + input_tag + ">", 0);
extract_by_tag(m_s3select_input, "QuoteCharacter", m_quot);
extract_by_tag(m_s3select_input, "RecordDelimiter", m_row_delimiter);
extract_by_tag(m_s3select_input, "FileHeaderInfo", m_header_info);
+ extract_by_tag(m_s3select_input, "Type", m_json_datatype);
if (m_row_delimiter.size()==0) {
m_row_delimiter='\n';
- } else if(m_row_delimiter.compare(" ") == 0) {
+ } else if (m_row_delimiter.compare(" ") == 0) {
//presto change
m_row_delimiter='\n';
}
extract_by_tag(m_s3select_output, "RecordDelimiter", output_row_delimiter);
if (output_row_delimiter.size()==0) {
output_row_delimiter='\n';
- } else if(output_row_delimiter.compare(" ") == 0) {
+ } else if (output_row_delimiter.compare(" ") == 0) {
//presto change
output_row_delimiter='\n';
}
ldpp_dout(this, 10) << "RGW supports currently only NONE option for compression type" << dendl;
return -1;
}
+ extract_by_tag(m_s3select_query, "Start", m_start_scan);
+ extract_by_tag(m_s3select_query, "End", m_end_scan);
+ if (m_start_scan.size() || m_end_scan.size()) {
+ m_scan_range_ind = true;
+ if (m_start_scan.size()) {
+ m_start_scan_sz = std::stol(m_start_scan);
+ }
+ if (m_end_scan.size()) {
+ m_end_scan_sz = std::stol(m_end_scan);
+ } else {
+ m_end_scan_sz = std::numeric_limits<std::int64_t>::max();
+ }
+ }
return 0;
}
m_request_range = len;
ldout(s->cct, 10) << "S3select: calling execute(async):" << " request-offset :" << ofs << " request-length :" << len << " buffer size : " << requested_buffer.size() << dendl;
RGWGetObj::execute(y);
- memcpy(buff, requested_buffer.data(), len);
+ if (buff) {
+ memcpy(buff, requested_buffer.data(), len);
+ }
ldout(s->cct, 10) << "S3select: done waiting, buffer is complete buffer-size:" << requested_buffer.size() << dendl;
return len;
}
if (m_parquet_type) {
//parquet processing
range_request(0, 4, parquet_magic, y);
- if(memcmp(parquet_magic, parquet_magic1, 4) && memcmp(parquet_magic, parquet_magicE, 4)) {
+ if (memcmp(parquet_magic, parquet_magic1, 4) && memcmp(parquet_magic, parquet_magicE, 4)) {
ldout(s->cct, 10) << s->object->get_name() << " does not contain parquet magic" << dendl;
op_ret = -ERR_INVALID_REQUEST;
return;
} else {
ldout(s->cct, 10) << "S3select: complete query with success " << dendl;
}
- } else {
- //CSV processing
- RGWGetObj::execute(y);
- }
+ } else {
+ //CSV or JSON processing
+ if (m_scan_range_ind) {
+ //scan-range
+ range_request(m_start_scan_sz, m_end_scan_sz - m_start_scan_sz, nullptr, y);
+ } else {
+ RGWGetObj::execute(y);
+ }
+ }//if (m_parquet_type)
}
int RGWSelectObj_ObjStore_S3::parquet_processing(bufferlist& bl, off_t ofs, off_t len)
{
- if (chunk_number == 0) {
- if (op_ret < 0) {
- set_req_state_err(s, op_ret);
- }
- dump_errno(s);
- }
- // Explicitly use chunked transfer encoding so that we can stream the result
- // to the user without having to wait for the full length of it.
- if (chunk_number == 0) {
- end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING);
- }
- chunk_number++;
+ fp_chunked_transfer_encoding();
size_t append_in_callback = 0;
int part_no = 1;
//concat the requested buffer
for (auto& it : bl.buffers()) {
- if(it.length() == 0) {
+ if (it.length() == 0) {
ldout(s->cct, 10) << "S3select: get zero-buffer while appending request-buffer " << dendl;
}
append_in_callback += it.length();
int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t len)
{
int status = 0;
-
- if (s->obj_size == 0) {
- status = run_s3select(m_sql_query.c_str(), nullptr, 0);
+
+ if (s->obj_size == 0 || m_object_size_for_processing == 0) {
+ status = run_s3select_on_csv(m_sql_query.c_str(), nullptr, 0);
+ if (status<0){
+ return -EINVAL;
+ }
} else {
auto bl_len = bl.get_num_buffers();
int i=0;
for(auto& it : bl.buffers()) {
ldpp_dout(this, 10) << "processing segment " << i << " out of " << bl_len << " off " << ofs
- << " len " << len << " obj-size " << s->obj_size << dendl;
- if(it.length() == 0) {
+ << " len " << len << " obj-size " << m_object_size_for_processing << dendl;
+ if (it.length() == 0 || len == 0) {
ldpp_dout(this, 10) << "s3select:it->_len is zero. segment " << i << " out of " << bl_len
- << " obj-size " << s->obj_size << dendl;
+ << " obj-size " << m_object_size_for_processing << dendl;
continue;
}
+ //NOTE: the it.length() must be used (not len)
m_aws_response_handler.update_processed_size(it.length());
- status = run_s3select(m_sql_query.c_str(), &(it)[0], it.length());
- if(status<0) {
+
+ if((ofs + len) > it.length()){
+ ldpp_dout(this, 10) << "offset and lenghth may cause invalid read: ofs = " << ofs << " len = " << len << " it.length() = " << it.length() << dendl;
+ ofs = 0;
+ len = it.length();
+ }
+ status = run_s3select_on_csv(m_sql_query.c_str(), &(it)[0] +ofs, len);
+ if (status<0) {
+ return -EINVAL;
+ }
+ if (m_s3_csv_object.is_sql_limit_reached()) {
+ break;
+ }
+ i++;
+ }
+ }
+ if (m_aws_response_handler.get_processed_size() == uint64_t(m_object_size_for_processing) || m_s3_csv_object.is_sql_limit_reached()) {
+ if (status >=0) {
+ m_aws_response_handler.init_stats_response();
+ m_aws_response_handler.send_stats_response();
+ m_aws_response_handler.init_end_response();
+ }
+ if (m_s3_csv_object.is_sql_limit_reached()) {
+ //stop fetching chunks
+ ldpp_dout(this, 10) << "s3select : reached the limit :" << m_aws_response_handler.get_processed_size() << dendl;
+ status = -ENOENT;
+ }
+ }
+ return status;
+}
+
+int RGWSelectObj_ObjStore_S3::json_processing(bufferlist& bl, off_t ofs, off_t len)
+{
+ int status = 0;
+
+ if (s->obj_size == 0 || m_object_size_for_processing == 0) {
+ //in case of empty object the s3select-function returns a correct "empty" result(for aggregation and non-aggregation queries).
+ status = run_s3select_on_json(m_sql_query.c_str(), nullptr, 0);
+ if (status<0)
+ return -EINVAL;
+ } else {
+ //loop on buffer-list(chunks)
+ auto bl_len = bl.get_num_buffers();
+ int i=0;
+ for(auto& it : bl.buffers()) {
+ ldpp_dout(this, 10) << "processing segment " << i << " out of " << bl_len << " off " << ofs
+ << " len " << len << " obj-size " << m_object_size_for_processing << dendl;
+ //skipping the empty chunks
+ if (len == 0) {
+ ldpp_dout(this, 10) << "s3select:it->_len is zero. segment " << i << " out of " << bl_len
+ << " obj-size " << m_object_size_for_processing << dendl;
+ continue;
+ }
+ m_aws_response_handler.update_processed_size(len);
+ status = run_s3select_on_json(m_sql_query.c_str(), &(it)[0], len);
+ if (status<0) {
+ status = -EINVAL;
break;
}
+ if (m_s3_json_object.is_sql_limit_reached()) {
+ break;
+ }
i++;
}
}
- if (m_aws_response_handler.get_processed_size() == s->obj_size) {
+
+ if (status>=0 && (m_aws_response_handler.get_processed_size() == uint64_t(m_object_size_for_processing) || m_s3_json_object.is_sql_limit_reached())) {
+ //flush the internal JSON buffer(upon last chunk)
+ status = run_s3select_on_json(m_sql_query.c_str(), nullptr, 0);
+ if (status<0) {
+ return -EINVAL;
+ }
if (status >=0) {
m_aws_response_handler.init_stats_response();
m_aws_response_handler.send_stats_response();
m_aws_response_handler.init_end_response();
}
+ if (m_s3_json_object.is_sql_limit_reached()){
+ //stop fetching chunks
+ status = -ENOENT;
+ ldpp_dout(this, 10) << "s3select : reached the limit :" << m_aws_response_handler.get_processed_size() << dendl;
+ }
}
return status;
}
int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_t len)
{
+ if (m_scan_range_ind == false){
+ m_object_size_for_processing = s->obj_size;
+ }
+ if (m_scan_range_ind == true){
+ if (m_end_scan_sz == -1){
+ m_end_scan_sz = s->obj_size;
+ }
+ m_object_size_for_processing = m_end_scan_sz - m_start_scan_sz;
+ }
if (!m_aws_response_handler.is_set()) {
m_aws_response_handler.set(s, this);
}
- if(len == 0 && s->obj_size != 0) {
+ if (len == 0 && s->obj_size != 0) {
return 0;
}
if (m_parquet_type) {
return parquet_processing(bl,ofs,len);
}
+ if (m_json_type) {
+ return json_processing(bl,ofs,len);
+ }
return csv_processing(bl,ofs,len);
}