void aws_response_handler::send_continuation_response()
{
+ m_fp_chunk_encoding();
set_continue_buffer();
continue_result.resize(header_crc_size, '\0');
get_buffer()->clear();
void aws_response_handler::init_end_response()
{
+ m_fp_chunk_encoding();
sql_result.resize(header_crc_size, '\0');
get_buffer()->clear();
header_size = create_header_end();
rgw_flush_formatter_and_reset(s, s->formatter);
}
-void aws_response_handler::send_error_response(const char* error_message)
+void aws_response_handler::send_error_response(const char* error_code, const char* error_message, const char* resource_id)
{
- //currently not in use. need to change the s3-test, this error-response raises a boto3 exception
+ m_fp_chunk_encoding();
+ std::string out_error_msg = std::string(error_code) + " :" + std::string(error_message) + " :" + std::string(resource_id);
error_result.resize(header_crc_size, '\0');
get_buffer()->clear();
- header_size = create_error_header_records(error_message);
+ header_size = create_error_header_records(out_error_msg.data());
error_result.append(get_buffer()->c_str(), header_size);
int buff_len = create_message(header_size,&error_result);
#ifdef PAYLOAD_TAG
sql_result.append(END_PAYLOAD_LINE);
#endif
+ m_fp_chunk_encoding();
int buff_len = create_message(m_success_header_size);
s->formatter->write_bin_data(sql_result.data(), buff_len);
rgw_flush_formatter_and_reset(s, s->formatter);
}
-void aws_response_handler::send_error_response_rgw_formatter(const char* error_code,
- const char* error_message,
- const char* resource_id)
+static constexpr const char* empty_error="--";
+
+void aws_response_handler::send_error_response_rgw_formatter(const char* error_code = empty_error,
+ const char* error_message = empty_error,
+ const char* resource_id = empty_error)
{
set_req_state_err(s, 0);
dump_errno(s, 400);
void aws_response_handler::send_progress_response()
{
+ m_fp_chunk_encoding();
std::string progress_payload = fmt::format("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Progress><BytesScanned>{}</BytesScanned><BytesProcessed>{}</BytesProcessed><BytesReturned>{}</BytesReturned></Progress>"
, get_processed_size(), get_processed_size(), get_total_bytes_returned());
sql_result.append(progress_payload);
void aws_response_handler::send_stats_response()
{
+ m_fp_chunk_encoding();
std::string stats_payload = fmt::format("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Stats><BytesScanned>{}</BytesScanned><BytesProcessed>{}</BytesProcessed><BytesReturned>{}</BytesReturned></Stats>"
, get_processed_size(), get_processed_size(), get_total_bytes_returned());
sql_result.append(stats_payload);
return 0;
};
fp_s3select_result_format = [this](std::string& result) {
- fp_chunked_transfer_encoding();
m_aws_response_handler.send_success_response();
return 0;
};
fp_s3select_continue = [this](std::string& result) {
- fp_chunked_transfer_encoding();
m_aws_response_handler.send_continuation_response();
return 0;
};
}
chunk_number++;
};
+
}
RGWSelectObj_ObjStore_S3::~RGWSelectObj_ObjStore_S3()
if (s3select_syntax.get_error_description().empty() == false) {
//error-flow (syntax-error)
- m_aws_response_handler.send_error_response_rgw_formatter(s3select_syntax_error,s3select_syntax.get_error_description().c_str(),s3select_resource_id);
+ m_aws_response_handler.send_error_response(s3select_syntax_error,s3select_syntax.get_error_description().c_str(),s3select_resource_id);
ldpp_dout(this, 10) << "s3-select query: failed to prase the following query {" << query << "}" << dendl;
ldpp_dout(this, 10) << "s3-select query: syntax-error {" << s3select_syntax.get_error_description() << "}" << dendl;
return -1;
if (status < 0) {
//error flow(processing-time)
- m_aws_response_handler.send_error_response_rgw_formatter(s3select_processTime_error,m_s3_csv_object.get_error_description().c_str(),s3select_resource_id);
+ m_aws_response_handler.send_error_response(s3select_processTime_error,m_s3_csv_object.get_error_description().data(),s3select_resource_id);
ldpp_dout(this, 10) << "s3-select query: failed to process query; {" << m_s3_csv_object.get_error_description() << "}" << dendl;
return -1;
}
ldpp_dout(this, 10) << "s3-select: complete chunk processing : chunk length = " << input_length << dendl;
if (enable_progress == true) {
- fp_chunked_transfer_encoding();
m_aws_response_handler.init_progress_response();
m_aws_response_handler.send_progress_response();
}
}
if (s3select_syntax.get_error_description().empty() == false) {
//the SQL statement failed the syntax parser
- fp_chunked_transfer_encoding();
- m_aws_response_handler.send_error_response(m_s3_parquet_object.get_error_description().c_str());
+ m_aws_response_handler.send_error_response(s3select_syntax_error,m_s3_parquet_object.get_error_description().c_str(),s3select_resource_id);
ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax.get_error_description() << "}" << dendl;
status = -1;
status = m_s3_parquet_object.run_s3select_on_object(m_aws_response_handler.get_sql_result());
if (status < 0) {
- fp_chunked_transfer_encoding();
- m_aws_response_handler.send_error_response(m_s3_parquet_object.get_error_description().c_str());
+ m_aws_response_handler.send_error_response(s3select_processTime_error,m_s3_parquet_object.get_error_description().c_str(),s3select_resource_id);
return -1;
}
{
int status = 0;
- m_s3_csv_object.set_external_system_functions(fp_s3select_continue,
+ m_s3_json_object.set_external_system_functions(fp_s3select_continue,
fp_s3select_result_format,
fp_result_header_format,
fp_debug_mesg);
s3select_syntax.parse_query(m_sql_query.c_str());
if (s3select_syntax.get_error_description().empty() == false) {
//SQL statement is wrong(syntax).
- m_aws_response_handler.send_error_response_rgw_formatter(s3select_syntax_error,
+ m_aws_response_handler.send_error_response(s3select_syntax_error,
s3select_syntax.get_error_description().c_str(),
s3select_resource_id);
ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax.get_error_description() << "}" << dendl;
} catch(base_s3select_exception& e) {
ldpp_dout(this, 10) << "S3select: failed to process JSON object: " << e.what() << dendl;
m_aws_response_handler.get_sql_result().append(e.what());
- m_aws_response_handler.send_error_response_rgw_formatter(s3select_processTime_error,
+ m_aws_response_handler.send_error_response(s3select_processTime_error,
e.what(),
s3select_resource_id);
return -EINVAL;
m_aws_response_handler.update_total_bytes_returned(length_post_processing - length_before_processing);
if (status < 0) {
//error flow(processing-time)
- m_aws_response_handler.send_error_response_rgw_formatter(s3select_processTime_error,
+ m_aws_response_handler.send_error_response(s3select_processTime_error,
m_s3_json_object.get_error_description().c_str(),
s3select_resource_id);
ldpp_dout(this, 10) << "s3-select query: failed to process query; {" << m_s3_json_object.get_error_description() << "}" << dendl;
return -EINVAL;
}
- fp_chunked_transfer_encoding();
if (length_post_processing-length_before_processing != 0) {
m_aws_response_handler.send_success_response();
#ifdef _ARROW_EXIST
m_rgw_api.m_y = &y;
#endif
+
+ if (!m_aws_response_handler.is_set()) {
+ m_aws_response_handler.set(s, this, fp_chunked_transfer_encoding);
+ }
+
+ if(s->cct->_conf->rgw_disable_s3select == true)
+ {
+ std::string error_msg="s3select : is disabled by rgw_disable_s3select configuration parameter";
+ ldpp_dout(this, 10) << error_msg << dendl;
+ m_aws_response_handler.send_error_response_rgw_formatter(error_msg.data());
+
+ op_ret = -ERR_INVALID_REQUEST;
+ return;
+ }
+
if (m_parquet_type) {
//parquet processing
range_request(0, 4, parquet_magic, y);
int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_t len)
{
+
if (m_scan_range_ind == false){
m_object_size_for_processing = s->obj_size;
}
}
}
if (!m_aws_response_handler.is_set()) {
- m_aws_response_handler.set(s, this);
+ m_aws_response_handler.set(s, this, fp_chunked_transfer_encoding);
}
if (len == 0 && s->obj_size != 0) {
return 0;