m_buff_header.clear();
header_size = create_header_records();
sql_result.append(m_buff_header.c_str(), header_size);
+#ifdef PAYLOAD_TAG
sql_result.append(PAYLOAD_LINE);
+#endif
}
void aws_response_handler::send_continuation_response()
void aws_response_handler::send_success_response()
{
+#ifdef PAYLOAD_TAG
sql_result.append(END_PAYLOAD_LINE);
+#endif
int buff_len = create_message(header_size);
s->formatter->write_bin_data(sql_result.data(), buff_len);
rgw_flush_formatter_and_reset(s, s->formatter);
m_object_size_for_processing(0),
m_parquet_type(false),
m_json_type(false),
- chunk_number(0)
+ chunk_number(0),
+ m_requested_range(0),
+ m_scan_offset(1024),
+ m_skip_next_chunk(false),
+ m_is_trino_request(false)
{
set_get_data(true);
fp_get_obj_size = [&]() {
ldpp_dout(this, 10) << "s3-select query: failed to retrieve query;" << dendl;
return -1;
}
+ const auto& m = s->info.env->get_map();
+ auto user_agent = m.find("HTTP_USER_AGENT"); {
+ if (user_agent != m.end()){
+ if (user_agent->second.find("Trino") != std::string::npos){
+ m_is_trino_request = true;
+ ldpp_dout(this, 10) << "s3-select query: request sent by Trino." << dendl;
+ }
+ }
+ }
+
int status = handle_aws_cli_parameters(m_sql_query);
if (status<0) {
return status;
if (m_escape_char.size()) {
csv.escape_char = *m_escape_char.c_str();
}
- if (m_enable_progress.compare("true")==0) {
- enable_progress = true;
- } else {
- enable_progress = false;
- }
if (output_row_delimiter.size()) {
csv.output_row_delimiter = *output_row_delimiter.c_str();
}
}
if ((length_post_processing-length_before_processing) != 0) {
ldpp_dout(this, 10) << "s3-select: sql-result-size = " << m_aws_response_handler.get_sql_result().size() << dendl;
- ldpp_dout(this, 10) << "s3-select: sql-result{" << m_aws_response_handler.get_sql_result() << "}" << dendl;
} else {
m_aws_response_handler.send_continuation_response();
}
m_end_scan_sz = std::numeric_limits<std::int64_t>::max();
}
}
+ if (m_enable_progress.compare("true")==0) {
+ enable_progress = true;
+ } else {
+ enable_progress = false;
+ }
return 0;
}
} else {
//CSV or JSON processing
if (m_scan_range_ind) {
- //scan-range
- range_request(m_start_scan_sz, m_end_scan_sz - m_start_scan_sz, nullptr, y);
+
+ m_requested_range = (m_end_scan_sz - m_start_scan_sz);
+
+ if(m_is_trino_request){
+ // fetch more than requested(m_scan_offset), that additional bytes are scanned for end of row,
+ // thus the additional length will be processed, and no broken row for Trino.
+ // assumption: row is smaller than m_scan_offset. (a different approach is to request for additional range)
+ range_request(m_start_scan_sz, m_requested_range + m_scan_offset, nullptr, y);
+ } else {
+ range_request(m_start_scan_sz, m_requested_range, nullptr, y);
+ }
+
} else {
RGWGetObj::execute(y);
}
return 0;
}
+void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp, off_t& ofs, off_t& len)
+{
+//in case it is a scan range request and sent by Trino client.
+//this routine chops the start/end of chunks.
+//the purpose is to return "perfect" results, with no broken or missing lines.
+
+ off_t new_offset = 0;
+ if(m_scan_range_ind){//only upon range-scan
+ int64_t sc=0;
+ int64_t start =0;
+ const char* row_delimiter = m_row_delimiter.c_str();
+
+ ldpp_dout(this, 10) << "s3select query: per Trino request the first and last chunk should modified." << dendl;
+
+ //chop the head of the first chunk and only upon the slice does not include the head of the object.
+ if(m_start_scan_sz && (m_aws_response_handler.get_processed_size()==0)){
+ char* p = const_cast<char*>(it_cp+ofs);
+ while(strncmp(row_delimiter,p,1) && (p - (it_cp+ofs)) < len)p++;
+ if(!strncmp(row_delimiter,p,1)){
+ new_offset += (p - (it_cp+ofs))+1;
+ }
+ }
+
+ //RR : end of the range-request. the original request sent by Trino client
+ //RD : row-delimiter
+ //[ ... ] : chunk boundaries
+
+ //chop the end of the last chunk for this request
+ //if it's the last chunk, search for first row-delimiter for the following different use-cases
+ if((m_aws_response_handler.get_processed_size()+len) >= m_requested_range){
+ //had pass the requested range, start to search for first delimiter
+ if(m_aws_response_handler.get_processed_size()>m_requested_range){
+ //the previous chunk contain the complete request(all data) and an extra bytes.
+ //thus, search for the first row-delimiter
+ //[:previous (RR) ... ][:current (RD) ]
+ start = 0;
+ } else if(m_aws_response_handler.get_processed_size()){
+ //the *current* chunk contain the complete request in the middle of the chunk.
+ //thus, search for the first row-delimiter after the complete request position
+ //[:current (RR) .... (RD) ]
+ start = m_requested_range - m_aws_response_handler.get_processed_size();
+ } else {
+ //the current chunk is the first chunk and it contains complete request
+ //[:current:first-chunk (RR) .... (RD) ]
+ start = m_requested_range;
+ }
+
+ for(sc=start;sc<len;sc++)//assumption : row-delimiter must exist or its end ebject
+ {
+ char* p = const_cast<char*>(it_cp) + ofs + sc;
+ if(!strncmp(row_delimiter,p,1)){
+ ldout(s->cct, 10) << "S3select: found row-delimiter on " << sc << " get_processed_size = " << m_aws_response_handler.get_processed_size() << dendl;
+ len = sc + 1;//+1 is for delimiter. TODO what about m_object_size_for_processing (to update according to len)
+ //the end of row exist in current chunk.
+ //thus, the next chunk should be skipped
+ m_skip_next_chunk = true;
+ break;
+ }
+ }
+ }
+ ofs += new_offset;
+ }
+
+ ldout(s->cct, 10) << "S3select: shape_chunk_per_trino_requests:update progress len = " << len << dendl;
+ len -= new_offset;
+}
+
int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t len)
{
int status = 0;
-
+ if(m_skip_next_chunk == true){
+ return status;
+ }
+
if (s->obj_size == 0 || m_object_size_for_processing == 0) {
status = run_s3select_on_csv(m_sql_query.c_str(), nullptr, 0);
if (status<0){
}
} else {
auto bl_len = bl.get_num_buffers();
- int i=0;
+ int buff_no=0;
for(auto& it : bl.buffers()) {
- ldpp_dout(this, 10) << "processing segment " << i << " out of " << bl_len << " off " << ofs
+ ldpp_dout(this, 10) << "s3select :processing segment " << buff_no << " out of " << bl_len << " off " << ofs
<< " len " << len << " obj-size " << m_object_size_for_processing << dendl;
if (it.length() == 0 || len == 0) {
- ldpp_dout(this, 10) << "s3select:it->_len is zero. segment " << i << " out of " << bl_len
+ ldpp_dout(this, 10) << "s3select :it->_len is zero. segment " << buff_no << " out of " << bl_len
<< " obj-size " << m_object_size_for_processing << dendl;
continue;
}
- //NOTE: the it.length() must be used (not len)
- m_aws_response_handler.update_processed_size(it.length());
if((ofs + len) > it.length()){
- ldpp_dout(this, 10) << "offset and lenghth may cause invalid read: ofs = " << ofs << " len = " << len << " it.length() = " << it.length() << dendl;
+ ldpp_dout(this, 10) << "offset and length may cause invalid read: ofs = " << ofs << " len = " << len << " it.length() = " << it.length() << dendl;
ofs = 0;
len = it.length();
}
- status = run_s3select_on_csv(m_sql_query.c_str(), &(it)[0] +ofs, len);
- if (status<0) {
- return -EINVAL;
- }
- if (m_s3_csv_object.is_sql_limit_reached()) {
- break;
- }
- i++;
+
+ if(m_is_trino_request){
+ shape_chunk_per_trino_requests(&(it)[0], ofs, len);
}
- }
- if (m_aws_response_handler.get_processed_size() == uint64_t(m_object_size_for_processing) || m_s3_csv_object.is_sql_limit_reached()) {
+
+ ldpp_dout(this, 10) << "s3select: chunk: ofs = " << ofs << " len = " << len << " it.length() = " << it.length() << " m_object_size_for_processing = " << m_object_size_for_processing << dendl;
+
+ m_aws_response_handler.update_processed_size(it.length());//NOTE : to run analysis to validate len is aligned with m_processed_bytes
+ status = run_s3select_on_csv(m_sql_query.c_str(), &(it)[0] + ofs, len);
+ if (status<0) {
+ return -EINVAL;
+ }
+ if (m_s3_csv_object.is_sql_limit_reached()) {
+ break;
+ }
+ buff_no++;
+ }//for
+ }//else
+
+ ldpp_dout(this, 10) << "s3select : m_aws_response_handler.get_processed_size() " << m_aws_response_handler.get_processed_size()
+ << " m_object_size_for_processing " << uint64_t(m_object_size_for_processing) << dendl;
+
+ if (m_aws_response_handler.get_processed_size() >= uint64_t(m_object_size_for_processing) || m_s3_csv_object.is_sql_limit_reached()) {
if (status >=0) {
m_aws_response_handler.init_stats_response();
m_aws_response_handler.send_stats_response();
m_aws_response_handler.init_end_response();
+ ldpp_dout(this, 10) << "s3select : reached the end of query request : aws_response_handler.get_processed_size() " << m_aws_response_handler.get_processed_size()
+ << "m_object_size_for_processing : " << m_object_size_for_processing << dendl;
}
if (m_s3_csv_object.is_sql_limit_reached()) {
//stop fetching chunks
status = -ENOENT;
}
}
+
return status;
}
<< " obj-size " << m_object_size_for_processing << dendl;
continue;
}
+
+ if((ofs + len) > it.length()){
+ ldpp_dout(this, 10) << "s3select: offset and length may cause invalid read: ofs = " << ofs << " len = " << len << " it.length() = " << it.length() << dendl;
+ ofs = 0;
+ len = it.length();
+ }
+
m_aws_response_handler.update_processed_size(len);
- status = run_s3select_on_json(m_sql_query.c_str(), &(it)[0], len);
+ status = run_s3select_on_json(m_sql_query.c_str(), &(it)[0] + ofs, len);
if (status<0) {
status = -EINVAL;
break;
break;
}
i++;
- }
- }
+ }//for
+ }//else
if (status>=0 && (m_aws_response_handler.get_processed_size() == uint64_t(m_object_size_for_processing) || m_s3_json_object.is_sql_limit_reached())) {
//flush the internal JSON buffer(upon last chunk)
if (m_end_scan_sz == -1){
m_end_scan_sz = s->obj_size;
}
- m_object_size_for_processing = m_end_scan_sz - m_start_scan_sz;
+ if (static_cast<uint64_t>((m_end_scan_sz - m_start_scan_sz))>s->obj_size){ //in the case user provides range bigger than object-size
+ m_object_size_for_processing = s->obj_size;
+ } else {
+ m_object_size_for_processing = m_end_scan_sz - m_start_scan_sz;
+ }
}
if (!m_aws_response_handler.is_set()) {
m_aws_response_handler.set(s, this);