m_buff_header.clear();
header_size = create_header_records();
sql_result.append(m_buff_header.c_str(), header_size);
- sql_result.append(PAYLOAD_LINE);
+ //sql_result.append(PAYLOAD_LINE); //TODO add switch
}
void aws_response_handler::send_continuation_response()
void aws_response_handler::send_success_response()
{
- sql_result.append(END_PAYLOAD_LINE);
+ //sql_result.append(END_PAYLOAD_LINE);
int buff_len = create_message(header_size);
s->formatter->write_bin_data(sql_result.data(), buff_len);
rgw_flush_formatter_and_reset(s, s->formatter);
m_object_size_for_processing(0),
m_parquet_type(false),
m_json_type(false),
- chunk_number(0)
+ chunk_number(0),
+ m_requested_range(0)
{
set_get_data(true);
fp_get_obj_size = [&]() {
//CSV or JSON processing
if (m_scan_range_ind) {
//scan-range
- range_request(m_start_scan_sz, m_end_scan_sz - m_start_scan_sz, nullptr, y);
+ //TODO add 1024 to m_end_scan_sz. and search for first row-delimiter starting m_end_scan_sz(before adding 1024)
+ size_t scan_offset = 1024; //TODO data member
+
+ if(s->obj_size && ((m_end_scan_sz + scan_offset) > s->obj_size)){
+ scan_offset = s->obj_size - m_end_scan_sz;
+ }
+ m_requested_range = (m_end_scan_sz - m_start_scan_sz) + scan_offset;
+ range_request(m_start_scan_sz, (m_end_scan_sz - m_start_scan_sz) + scan_offset, nullptr, y);
+ //TODO a second range request?? csv_processing may indicate a need for a second round(what about another one?)
} else {
RGWGetObj::execute(y);
}
}
return 0;
}
+#if 1
+void RGWSelectObj_ObjStore_S3::continue_to_end_of_csv_row(const char* it_cp, off_t ofs, off_t& len)
+{
+//in case it is a scan range response, and row-delimiter(\n) is not included in last row
+//in needs to fetch another range(defualt = 1024, maybe its better to that on first call)
+ if(m_scan_range_ind){
+ int sc=0;
+ size_t scan_offset = 1024; //TODO data member
+
+ // if it's the last chunk , reduce scan-offset and search for first "\n"
+ if((m_aws_response_handler.get_processed_size()+len)>=m_requested_range)
+ {
+ for(sc=(len - scan_offset); sc < len; sc++)
+ {
+ char* p=(char*)it_cp + ofs + sc;
+ if(!strncmp("\n",p,1))
+ break;
+ }
+ len = sc + 1;//+1 for delimiter
+ ldpp_dout(this, 10) << "len =" << len << dendl;
+ }
+ }
+ m_aws_response_handler.update_processed_size(len);
+}
+#endif
int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t len)
{
}
} else {
auto bl_len = bl.get_num_buffers();
- int i=0;
+ int buff_no=0;
for(auto& it : bl.buffers()) {
- ldpp_dout(this, 10) << "processing segment " << i << " out of " << bl_len << " off " << ofs
+ ldpp_dout(this, 10) << "processing segment " << buff_no << " out of " << bl_len << " off " << ofs
<< " len " << len << " obj-size " << m_object_size_for_processing << dendl;
if (it.length() == 0 || len == 0) {
- ldpp_dout(this, 10) << "s3select:it->_len is zero. segment " << i << " out of " << bl_len
+ ldpp_dout(this, 10) << "s3select:it->_len is zero. segment " << buff_no << " out of " << bl_len
<< " obj-size " << m_object_size_for_processing << dendl;
continue;
}
- //NOTE: the it.length() must be used (not len)
- m_aws_response_handler.update_processed_size(it.length());
if((ofs + len) > it.length()){
ldpp_dout(this, 10) << "offset and lenghth may cause invalid read: ofs = " << ofs << " len = " << len << " it.length() = " << it.length() << dendl;
ofs = 0;
len = it.length();
}
- status = run_s3select_on_csv(m_sql_query.c_str(), &(it)[0] +ofs, len);
- if (status<0) {
- return -EINVAL;
- }
- if (m_s3_csv_object.is_sql_limit_reached()) {
- break;
- }
- i++;
+
+ //TODO : should run only for the last element of bufferlist
+ continue_to_end_of_csv_row(&(it)[0], ofs, len);
+ status = run_s3select_on_csv(m_sql_query.c_str(), &(it)[0] + ofs, len);
+ if (status<0) {
+ return -EINVAL;
}
- }
- if (m_aws_response_handler.get_processed_size() == uint64_t(m_object_size_for_processing) || m_s3_csv_object.is_sql_limit_reached()) {
+ if (m_s3_csv_object.is_sql_limit_reached()) {
+ break;
+ }
+ buff_no++;
+ }//for
+ }//else
+
+ ldpp_dout(this, 10) << "s3select : m_aws_response_handler.get_processed_size() " << m_aws_response_handler.get_processed_size()
+ << " m_object_size_for_processing " << uint64_t(m_object_size_for_processing) << dendl;
+
+ if (m_aws_response_handler.get_processed_size() >= uint64_t(m_object_size_for_processing) || m_s3_csv_object.is_sql_limit_reached()) {
if (status >=0) {
m_aws_response_handler.init_stats_response();
m_aws_response_handler.send_stats_response();
m_aws_response_handler.init_end_response();
+ ldpp_dout(this, 10) << "s3select : reached the end of query request : aws_response_handler.get_processed_size() " << m_aws_response_handler.get_processed_size()
+ << "m_object_size_for_processing : " << m_object_size_for_processing << dendl;
}
if (m_s3_csv_object.is_sql_limit_reached()) {
//stop fetching chunks
status = -ENOENT;
}
}
+
return status;
}