m_buff_header.clear();
header_size = create_header_records();
sql_result.append(m_buff_header.c_str(), header_size);
- //sql_result.append(PAYLOAD_LINE); //TODO add switch
+#ifdef PAYLOAD_TAG
+ sql_result.append(PAYLOAD_LINE);
+#endif
}
void aws_response_handler::send_continuation_response()
void aws_response_handler::send_success_response()
{
- //sql_result.append(END_PAYLOAD_LINE);
+#ifdef PAYLOAD_TAG
+ sql_result.append(END_PAYLOAD_LINE);
+#endif
int buff_len = create_message(header_size);
s->formatter->write_bin_data(sql_result.data(), buff_len);
rgw_flush_formatter_and_reset(s, s->formatter);
m_parquet_type(false),
m_json_type(false),
chunk_number(0),
- m_requested_range(0)
+ m_requested_range(0),
+ m_scan_offset(1024)
{
set_get_data(true);
fp_get_obj_size = [&]() {
} else {
//CSV or JSON processing
if (m_scan_range_ind) {
- //scan-range
- //TODO add 1024 to m_end_scan_sz. and search for first row-delimiter starting m_end_scan_sz(before adding 1024)
- size_t scan_offset = 1024; //TODO data member
-
- if(s->obj_size && ((m_end_scan_sz + scan_offset) > s->obj_size)){
- scan_offset = s->obj_size - m_end_scan_sz;
- }
- m_requested_range = (m_end_scan_sz - m_start_scan_sz) + scan_offset;
- range_request(m_start_scan_sz, (m_end_scan_sz - m_start_scan_sz) + scan_offset, nullptr, y);
- //TODO a second range request?? csv_processing may indicate a need for a second round(what about another one?)
+
+ m_requested_range = (m_end_scan_sz - m_start_scan_sz);
+ // fetch more than requested(m_scan_offset), that additional bytes are scanned for end of row,
+ // thus the additional length will be processed, and no broken row for Trino.
+ // assumption: row is smaller than m_scan_offset. (a different approach is to request for additional range)
+ range_request(m_start_scan_sz, (m_end_scan_sz - m_start_scan_sz) + m_scan_offset, nullptr, y);
} else {
RGWGetObj::execute(y);
}
}
return 0;
}
-#if 1
-void RGWSelectObj_ObjStore_S3::continue_to_end_of_csv_row(const char* it_cp, off_t ofs, off_t& len)
-{
-//in case it is a scan range response, and row-delimiter(\n) is not included in last row
-//in needs to fetch another range(defualt = 1024, maybe its better to that on first call)
- if(m_scan_range_ind){
- int sc=0;
- size_t scan_offset = 1024; //TODO data member
-
- // if it's the last chunk , reduce scan-offset and search for first "\n"
- if((m_aws_response_handler.get_processed_size()+len)>=m_requested_range)
- {
- for(sc=(len - scan_offset); sc < len; sc++)
+
+void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp, off_t& ofs, off_t& len)
+{
+//in case it is a scan range response, and row might be broken, this routine search for the end-of-row.
+ off_t new_offset = 0;
+ if(m_scan_range_ind){//only upon range-scan
+ uint64_t sc=0;
+ uint64_t start =0;
+ const char* row_delimiter = m_row_delimiter.c_str();
+
+ //chop the head of the first chunk.
+ if(m_start_scan_sz && (m_aws_response_handler.get_processed_size()==0)){
+ char* p = (char*)it_cp;
+ while(strncmp(row_delimiter,p,1) && (p - it_cp) < len)p++;
+ if(!strncmp(row_delimiter,p,1)){
+ new_offset += (p - it_cp)+1;
+ }
+ }
+
+ //chop the end of chunk
+ //if it's the last chunk, reduce scan-offset and search for first row-delimiter
+ if((m_aws_response_handler.get_processed_size()+len) >= m_requested_range){
+ //had pass the requested range, start to search for first delimiter
+ if(m_aws_response_handler.get_processed_size()){
+ start = m_aws_response_handler.get_processed_size() - m_requested_range;
+ } else {
+ start = m_requested_range;
+ }
+
+ ldout(s->cct, 10) << "S3select: start scan from " << start << " len = " << len << dendl;
+ for(sc=start;sc<len;sc++)//assumption : row-delimiter must exist or its end ebject
{
- char* p=(char*)it_cp + ofs + sc;
- if(!strncmp("\n",p,1))
+ char* p = (char*)it_cp + ofs + sc;
+ if(!strncmp(row_delimiter,p,1)){
+ ldout(s->cct, 10) << "S3select: found row-delimiter on " << sc << " get_processed_size = " << m_aws_response_handler.get_processed_size() << dendl;
+ len = sc + 1;//+1 is for delimiter. TODO what about m_object_size_for_processing (to update according to len)
break;
+ }
}
- len = sc + 1;//+1 for delimiter
ldpp_dout(this, 10) << "len =" << len << dendl;
}
+
+ ofs += new_offset;
}
- m_aws_response_handler.update_processed_size(len);
+
+ ldout(s->cct, 10) << "S3select: shape_chunk_per_trino_requests:update progress len = " << len << dendl;
+ m_aws_response_handler.update_processed_size(len);//TODO : to run analysis to validate len is aligned with m_processed_bytes
+ len -= new_offset;
}
-#endif
int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t len)
{
len = it.length();
}
- //TODO : should run only for the last element of bufferlist
- continue_to_end_of_csv_row(&(it)[0], ofs, len);
+ shape_chunk_per_trino_requests(&(it)[0], ofs, len);
status = run_s3select_on_csv(m_sql_query.c_str(), &(it)[0] + ofs, len);
if (status<0) {
return -EINVAL;
if (m_end_scan_sz == -1){
m_end_scan_sz = s->obj_size;
}
- m_object_size_for_processing = m_end_scan_sz - m_start_scan_sz;
+ if ((m_end_scan_sz - m_start_scan_sz)>s->obj_size){ //in the case user provides range bigger than object-size
+ m_object_size_for_processing = s->obj_size;
+ } else {
+ m_object_size_for_processing = m_end_scan_sz - m_start_scan_sz;
+ }
}
if (!m_aws_response_handler.is_set()) {
m_aws_response_handler.set(s, this);