m_json_type(false),
chunk_number(0),
m_requested_range(0),
- m_scan_offset(1024)
+ m_scan_offset(1024),
+ m_skip_next_chunk(false)
{
set_get_data(true);
fp_get_obj_size = [&]() {
void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp, off_t& ofs, off_t& len)
{
-//in case it is a scan range response, and row might be broken, this routine search for the end-of-row.
+//in case it is a scan range request and sent by Trino client.
+//this routine chops the start/end of chunks.
+//the purpose is to return "perfect" results, with no broken or missing lines.
+
off_t new_offset = 0;
if(m_scan_range_ind){//only upon range-scan
- uint64_t sc=0;
- uint64_t start =0;
+ int64_t sc=0;
+ int64_t start =0;
const char* row_delimiter = m_row_delimiter.c_str();
//chop the head of the first chunk.
if(m_start_scan_sz && (m_aws_response_handler.get_processed_size()==0)){
- char* p = (char*)it_cp;
+ char* p = const_cast<char*>(it_cp);
while(strncmp(row_delimiter,p,1) && (p - it_cp) < len)p++;
if(!strncmp(row_delimiter,p,1)){
new_offset += (p - it_cp)+1;
}
}
- //chop the end of chunk
- //if it's the last chunk, reduce scan-offset and search for first row-delimiter
+ //chop the end of the last chunk for this request
+ //if it's the last chunk, search for first row-delimiter for the following different use-cases
if((m_aws_response_handler.get_processed_size()+len) >= m_requested_range){
//had pass the requested range, start to search for first delimiter
- if(m_aws_response_handler.get_processed_size()){
- start = m_aws_response_handler.get_processed_size() - m_requested_range;
+ if(m_aws_response_handler.get_processed_size()>m_requested_range){
+ //the previous chunk contain the complete request(all data) and an extra bytes.
+ //thus, search for the first row-delimiter
+ start = 0;
+ } else if(m_aws_response_handler.get_processed_size()){
+ //the *current* chunk contain the complete request in the middle of the chunk.
+ //thus, search for the first row-delimiter after the complete request position
+ start = m_requested_range - m_aws_response_handler.get_processed_size();
} else {
+ //the current chunk is the first chunk and it contains complete request
start = m_requested_range;
}
ldout(s->cct, 10) << "S3select: start scan from " << start << " len = " << len << dendl;
for(sc=start;sc<len;sc++)//assumption : row-delimiter must exist or its end ebject
{
- char* p = (char*)it_cp + ofs + sc;
+ char* p = const_cast<char*>(it_cp) + ofs + sc;
if(!strncmp(row_delimiter,p,1)){
ldout(s->cct, 10) << "S3select: found row-delimiter on " << sc << " get_processed_size = " << m_aws_response_handler.get_processed_size() << dendl;
len = sc + 1;//+1 is for delimiter. TODO what about m_object_size_for_processing (to update according to len)
+ //the end of row exist in current chunk.
+ //thus, the next chunk should be skipped
+ m_skip_next_chunk = true;
break;
}
}
ldpp_dout(this, 10) << "len =" << len << dendl;
}
-
ofs += new_offset;
}
int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t len)
{
int status = 0;
-
+
+ if(m_skip_next_chunk == true){
+ return status;
+ }
+
if (s->obj_size == 0 || m_object_size_for_processing == 0) {
status = run_s3select_on_csv(m_sql_query.c_str(), nullptr, 0);
if (status<0){
if (m_end_scan_sz == -1){
m_end_scan_sz = s->obj_size;
}
- if ((m_end_scan_sz - m_start_scan_sz)>s->obj_size){ //in the case user provides range bigger than object-size
+ if (static_cast<uint64_t>((m_end_scan_sz - m_start_scan_sz))>s->obj_size){ //in the case user provides range bigger than object-size
m_object_size_for_processing = s->obj_size;
} else {
m_object_size_for_processing = m_end_scan_sz - m_start_scan_sz;