m_object_size_for_processing(0),
m_parquet_type(false),
m_json_type(false),
+ m_outputFormat(OutputFormat::CSV),
chunk_number(0),
m_requested_range(0),
m_scan_offset(1024),
int RGWSelectObj_ObjStore_S3::get_params(optional_yield y)
{
- if(m_s3select_query.empty() == false) {
+ if (m_s3select_query.empty() == false) {
return 0;
}
#ifndef _ARROW_EXIST
if (output_escape_char.size()) {
csv.output_escape_char = *output_escape_char.c_str();
}
- if(output_quote_fields.compare("ALWAYS") == 0) {
+ if (output_quote_fields.compare("ALWAYS") == 0) {
csv.quote_fields_always = true;
- } else if(output_quote_fields.compare("ASNEEDED") == 0) {
+ } else if (output_quote_fields.compare("ASNEEDED") == 0) {
csv.quote_fields_asneeded = true;
}
- if(m_header_info.compare("IGNORE")==0) {
+ if (m_header_info.compare("IGNORE")==0) {
csv.ignore_header_info=true;
- } else if(m_header_info.compare("USE")==0) {
+ } else if (m_header_info.compare("USE")==0) {
csv.use_header_info=true;
}
-
+ if (m_outputFormat == OutputFormat::JSON) {
+ csv.output_json_format = true;
+ }
m_s3_csv_object.set_csv_query(&s3select_syntax, csv);
m_s3_csv_object.set_external_system_functions(fp_s3select_continue,
if (!m_s3_parquet_object.is_set()) {
//parsing the SQL statement.
s3select_syntax.parse_query(m_sql_query.c_str());
+ parquet_object::csv_definitions parquet;
+ if (m_outputFormat == OutputFormat::JSON) {
+ parquet.output_json_format = true;
+ }
m_s3_parquet_object.set_external_system_functions(fp_s3select_continue,
fp_s3select_result_format,
try {
//at this stage the Parquet-processing requires for the meta-data that reside on Parquet object
- m_s3_parquet_object.set_parquet_object(std::string("s3object"), &s3select_syntax, &m_rgw_api);
+ m_s3_parquet_object.set_parquet_object(std::string("s3object"), &s3select_syntax, &m_rgw_api, parquet);
} catch(base_s3select_exception& e) {
ldpp_dout(this, 10) << "S3select: failed upon parquet-reader construction: " << e.what() << dendl;
fp_result_header_format(m_aws_response_handler.get_sql_result());
fp_s3select_result_format,
fp_result_header_format,
fp_debug_mesg);
+ json_object::csv_definitions json;
m_aws_response_handler.init_response();
return -EINVAL;
}
+ if (m_outputFormat == OutputFormat::JSON) {
+ json.output_json_format = true;
+ }
+
//parsing the SQL statement
s3select_syntax.parse_query(m_sql_query.c_str());
if (s3select_syntax.get_error_description().empty() == false) {
}
//initializing json processor
- json_object::csv_definitions output_definition;
- m_s3_json_object.set_json_query(&s3select_syntax,output_definition);
+ m_s3_json_object.set_json_query(&s3select_syntax, json);
if (input == nullptr) {
input = "";
ldpp_dout(this, 10) << "s3select: engine is set to process Parquet objects" << dendl;
}
+ if (m_s3select_query.find(output_tag+"><JSON") != std::string::npos) {
+ m_outputFormat = OutputFormat::JSON;
+ }
+
extract_by_tag(m_s3select_query, "Expression", sql_query);
extract_by_tag(m_s3select_query, "Enabled", m_enable_progress);
size_t _qi = m_s3select_query.find("<" + input_tag + ">", 0);
RGWGetObj::parse_range();
requested_buffer.clear();
m_request_range = len;
+ m_aws_response_handler.update_processed_size(len);
ldout(s->cct, 10) << "S3select: calling execute(async):" << " request-offset :" << ofs << " request-length :" << len << " buffer size : " << requested_buffer.size() << dendl;
RGWGetObj::execute(y);
if (buff) {
m_aws_response_handler.set(s, this, fp_chunked_transfer_encoding);
}
- if(s->cct->_conf->rgw_disable_s3select == true)
+ if (s->cct->_conf->rgw_disable_s3select == true)
{
std::string error_msg="s3select : is disabled by rgw_disable_s3select configuration parameter";
ldpp_dout(this, 10) << error_msg << dendl;
ldout(s->cct, 10) << "S3select: failed to process query <" << m_sql_query << "> on object " << s->object->get_name() << dendl;
op_ret = -ERR_INVALID_REQUEST;
} else {
- ldout(s->cct, 10) << "S3select: complete query with success " << dendl;
+ {//status per amount of processed data
+ //TODO add number of calls to range_request
+ m_aws_response_handler.init_stats_response();
+ m_aws_response_handler.send_stats_response();
+ m_aws_response_handler.init_end_response();
+ ldpp_dout(this, 10) << "s3select : reached the end of parquet query request : aws_response_handler.get_processed_size() "
+ << m_aws_response_handler.get_processed_size()
+ << "m_object_size_for_processing : " << m_object_size_for_processing << dendl;
+ }
+ ldout(s->cct, 10) << "S3select: complete parquet query with success " << dendl;
}
} else {
//CSV or JSON processing
m_requested_range = (m_end_scan_sz - m_start_scan_sz);
- if(m_is_trino_request){
+ if (m_is_trino_request){
// fetch more than requested(m_scan_offset), that additional bytes are scanned for end of row,
// thus the additional length will be processed, and no broken row for Trino.
// assumption: row is smaller than m_scan_offset. (a different approach is to request for additional range)
fp_chunked_transfer_encoding();
size_t append_in_callback = 0;
int part_no = 1;
+ parquet::ceph::S3select_Config::getInstance().set_s3select_reader_properties(s->cct->_conf->rgw_parquet_buffer_size);
//concat the requested buffer
for (auto& it : bl.buffers()) {
if (it.length() == 0) {
}
append_in_callback += it.length();
ldout(s->cct, 10) << "S3select: part " << part_no++ << " it.length() = " << it.length() << dendl;
+ if ((ofs + len) > it.length()){
+ ldpp_dout(this, 10) << "s3select: offset and length may cause invalid read: ofs = " << ofs << " len = " << len << " it.length() = " << it.length() << dendl;
+ ofs = 0;
+ len = it.length();
+ }
requested_buffer.append(&(it)[0]+ofs, len);
}
ldout(s->cct, 10) << "S3select:append_in_callback = " << append_in_callback << dendl;
//the purpose is to return "perfect" results, with no broken or missing lines.
off_t new_offset = 0;
- if(m_scan_range_ind){//only upon range-scan
+ if (m_scan_range_ind){//only upon range-scan
int64_t sc=0;
int64_t start =0;
const char* row_delimiter = m_row_delimiter.c_str();
ldpp_dout(this, 10) << "s3select query: per Trino request the first and last chunk should modified." << dendl;
//chop the head of the first chunk and only upon the slice does not include the head of the object.
- if(m_start_scan_sz && (m_aws_response_handler.get_processed_size()==0)){
+ if (m_start_scan_sz && (m_aws_response_handler.get_processed_size()==0)){
char* p = const_cast<char*>(it_cp+ofs);
while(strncmp(row_delimiter,p,1) && (p - (it_cp+ofs)) < len)p++;
- if(!strncmp(row_delimiter,p,1)){
+ if (!strncmp(row_delimiter,p,1)){
new_offset += (p - (it_cp+ofs))+1;
}
}
//chop the end of the last chunk for this request
//if it's the last chunk, search for first row-delimiter for the following different use-cases
- if((m_aws_response_handler.get_processed_size()+len) >= m_requested_range){
+ if ((m_aws_response_handler.get_processed_size()+len) >= m_requested_range){
//had pass the requested range, start to search for first delimiter
- if(m_aws_response_handler.get_processed_size()>m_requested_range){
+ if (m_aws_response_handler.get_processed_size()>m_requested_range){
//the previous chunk contain the complete request(all data) and an extra bytes.
//thus, search for the first row-delimiter
//[:previous (RR) ... ][:current (RD) ]
start = 0;
- } else if(m_aws_response_handler.get_processed_size()){
+ } else if (m_aws_response_handler.get_processed_size()){
//the *current* chunk contain the complete request in the middle of the chunk.
//thus, search for the first row-delimiter after the complete request position
//[:current (RR) .... (RD) ]
for(sc=start;sc<len;sc++)//assumption : row-delimiter must exist or its end ebject
{
char* p = const_cast<char*>(it_cp) + ofs + sc;
- if(!strncmp(row_delimiter,p,1)){
+ if (!strncmp(row_delimiter,p,1)){
ldout(s->cct, 10) << "S3select: found row-delimiter on " << sc << " get_processed_size = " << m_aws_response_handler.get_processed_size() << dendl;
len = sc + 1;//+1 is for delimiter. TODO what about m_object_size_for_processing (to update according to len)
//the end of row exist in current chunk.
int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t len)
{
int status = 0;
- if(m_skip_next_chunk == true){
+ if (m_skip_next_chunk == true){
return status;
}
}
- if(ofs > it.length()){
+ if (ofs > it.length()){
//safety check
ldpp_dout(this, 10) << "offset and length may cause invalid read: ofs = " << ofs << " len = " << len << " it.length() = " << it.length() << dendl;
ofs = 0;
}
- if(m_is_trino_request){
+ if (m_is_trino_request){
//TODO replace len with it.length() ? ; test Trino flow with compressed objects.
//is it possible to send get-by-ranges? in parallel?
shape_chunk_per_trino_requests(&(it)[0], ofs, len);
continue;
}
- if((ofs + len) > it.length()){
+ if ((ofs + len) > it.length()){
ldpp_dout(this, 10) << "s3select: offset and length may cause invalid read: ofs = " << ofs << " len = " << len << " it.length() = " << it.length() << dendl;
ofs = 0;
len = it.length();