char x;
short s;
x = char(strlen(header_name));
- m_buff_header.append(&x, sizeof(x));
- m_buff_header.append(header_name);
+ get_buffer()->append(&x, sizeof(x));
+ get_buffer()->append(header_name);
x = char(7);
- m_buff_header.append(&x, sizeof(x));
+ get_buffer()->append(&x, sizeof(x));
s = htons(uint16_t(strlen(header_value)));
- m_buff_header.append(reinterpret_cast<char*>(&s), sizeof(s));
- m_buff_header.append(header_value);
+ get_buffer()->append(reinterpret_cast<char*>(&s), sizeof(s));
+ get_buffer()->append(header_value);
}
#define IDX( x ) static_cast<int>( x )
push_header(header_name_str[IDX(header_name_En::CONTENT_TYPE)], header_value_str[IDX(header_value_En::OCTET_STREAM)]);
//3
push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
- return m_buff_header.size();
+ return get_buffer()->size();
}
int aws_response_handler::create_header_continuation()
push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::CONT)]);
//2
push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
- return m_buff_header.size();
+ return get_buffer()->size();
}
int aws_response_handler::create_header_progress()
push_header(header_name_str[IDX(header_name_En::CONTENT_TYPE)], header_value_str[IDX(header_value_En::XML)]);
//3
push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
- return m_buff_header.size();
+ return get_buffer()->size();
}
int aws_response_handler::create_header_stats()
push_header(header_name_str[IDX(header_name_En::CONTENT_TYPE)], header_value_str[IDX(header_value_En::XML)]);
//3
push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
- return m_buff_header.size();
+ return get_buffer()->size();
}
int aws_response_handler::create_header_end()
push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::END)]);
//2
push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
- return m_buff_header.size();
+ return get_buffer()->size();
}
int aws_response_handler::create_error_header_records(const char* error_message)
push_header(header_name_str[IDX(header_name_En::ERROR_MESSAGE)], error_message);
//3
push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::ERROR_TYPE)]);
- return m_buff_header.size();
+ return get_buffer()->size();
}
-int aws_response_handler::create_message(u_int32_t header_len)
+int aws_response_handler::create_message(u_int32_t header_len,std::string *msg_string = nullptr)
{
//message description(AWS):
//[total-byte-length:4][header-byte-length:4][crc:4][headers:variable-length][payload:variable-length][crc:4]
//are created later to the produced SQL result, and actually wrapping the payload.
auto push_encode_int = [&](u_int32_t s, int pos) {
u_int32_t x = htonl(s);
- sql_result.replace(pos, sizeof(x), reinterpret_cast<char*>(&x), sizeof(x));
+ msg_string->replace(pos, sizeof(x), reinterpret_cast<char*>(&x), sizeof(x));
};
+
+ msg_string = (msg_string == nullptr) ? &sql_result : msg_string;
+
u_int32_t total_byte_len = 0;
u_int32_t preload_crc = 0;
u_int32_t message_crc = 0;
- total_byte_len = sql_result.size() + 4; //the total is greater in 4 bytes than current size
+ total_byte_len = msg_string->size() + 4; //the total is greater in 4 bytes than current size
push_encode_int(total_byte_len, 0);
push_encode_int(header_len, 4);
crc32.reset();
- crc32 = std::for_each(sql_result.data(), sql_result.data() + 8, crc32); //crc for starting 8 bytes
+ crc32 = std::for_each(msg_string->data(), msg_string->data() + 8, crc32); //crc for starting 8 bytes
preload_crc = crc32();
push_encode_int(preload_crc, 8);
crc32.reset();
- crc32 = std::for_each(sql_result.begin(), sql_result.end(), crc32); //crc for payload + checksum
+ crc32 = std::for_each(msg_string->begin(), msg_string->end(), crc32); //crc for payload + checksum
message_crc = crc32();
u_int32_t x = htonl(message_crc);
- sql_result.append(reinterpret_cast<char*>(&x), sizeof(x));
- return sql_result.size();
+ msg_string->append(reinterpret_cast<char*>(&x), sizeof(x));
+ return msg_string->size();
}
void aws_response_handler::init_response()
void aws_response_handler::init_success_response()
{
- m_buff_header.clear();
- header_size = create_header_records();
- sql_result.append(m_buff_header.c_str(), header_size);
-#ifdef PAYLOAD_TAG
- sql_result.append(PAYLOAD_LINE);
-#endif
+ get_buffer()->clear();
+ m_success_header_size = create_header_records();
+ sql_result.append(get_buffer()->c_str(), m_success_header_size);
}
void aws_response_handler::send_continuation_response()
{
- sql_result.resize(header_crc_size, '\0');
- m_buff_header.clear();
+ set_continue_buffer();
+ continue_result.resize(header_crc_size, '\0');
+ get_buffer()->clear();
header_size = create_header_continuation();
- sql_result.append(m_buff_header.c_str(), header_size);
- int buff_len = create_message(header_size);
- s->formatter->write_bin_data(sql_result.data(), buff_len);
+ continue_result.append(get_buffer()->c_str(), header_size);
+ int buff_len = create_message(header_size,&continue_result);
+ s->formatter->write_bin_data(continue_result.data(), buff_len);
rgw_flush_formatter_and_reset(s, s->formatter);
+ get_buffer()->clear();
+ set_main_buffer();
}
void aws_response_handler::init_progress_response()
{
sql_result.resize(header_crc_size, '\0');
- m_buff_header.clear();
+ get_buffer()->clear();
header_size = create_header_progress();
- sql_result.append(m_buff_header.c_str(), header_size);
+ sql_result.append(get_buffer()->c_str(), header_size);
}
void aws_response_handler::init_stats_response()
{
sql_result.resize(header_crc_size, '\0');
- m_buff_header.clear();
+ get_buffer()->clear();
header_size = create_header_stats();
- sql_result.append(m_buff_header.c_str(), header_size);
+ sql_result.append(get_buffer()->c_str(), header_size);
}
void aws_response_handler::init_end_response()
{
sql_result.resize(header_crc_size, '\0');
- m_buff_header.clear();
+ get_buffer()->clear();
header_size = create_header_end();
- sql_result.append(m_buff_header.c_str(), header_size);
+ sql_result.append(get_buffer()->c_str(), header_size);
int buff_len = create_message(header_size);
s->formatter->write_bin_data(sql_result.data(), buff_len);
rgw_flush_formatter_and_reset(s, s->formatter);
}
-void aws_response_handler::init_error_response(const char* error_message)
+void aws_response_handler::send_error_response(const char* error_message)
{
- //currently not in use. the headers in the case of error, are not extracted by AWS-cli.
- m_buff_header.clear();
+ //currently not in use. need to change the s3-test, this error-response raises a boto3 exception
+ error_result.resize(header_crc_size, '\0');
+ get_buffer()->clear();
header_size = create_error_header_records(error_message);
- sql_result.append(m_buff_header.c_str(), header_size);
+ error_result.append(get_buffer()->c_str(), header_size);
+
+ int buff_len = create_message(header_size,&error_result);
+ s->formatter->write_bin_data(error_result.data(), buff_len);
+ rgw_flush_formatter_and_reset(s, s->formatter);
}
void aws_response_handler::send_success_response()
#ifdef PAYLOAD_TAG
sql_result.append(END_PAYLOAD_LINE);
#endif
- int buff_len = create_message(header_size);
+ int buff_len = create_message(m_success_header_size);
s->formatter->write_bin_data(sql_result.data(), buff_len);
rgw_flush_formatter_and_reset(s, s->formatter);
}
-void aws_response_handler::send_error_response(const char* error_code,
+void aws_response_handler::send_error_response_rgw_formatter(const char* error_code,
const char* error_message,
const char* resource_id)
{
}
RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3():
- m_buff_header(std::make_unique<char[]>(1000)),
m_scan_range_ind(false),
m_start_scan_sz(0),
m_end_scan_sz(0),
m_aws_response_handler.send_success_response();
return 0;
};
+ fp_s3select_continue = [this](std::string& result) {
+ fp_chunked_transfer_encoding();
+ m_aws_response_handler.send_continuation_response();
+ return 0;
+ };
fp_debug_mesg = [&](const char* mesg){
ldpp_dout(this, 10) << mesg << dendl;
int status = 0;
uint32_t length_before_processing, length_post_processing;
csv_object::csv_defintions csv;
- const char* s3select_syntax_error = "s3select-Syntax-Error";
- const char* s3select_resource_id = "resource-id";
- const char* s3select_processTime_error = "s3select-ProcessingTime-Error";
s3select_syntax.parse_query(query);
if (m_row_delimiter.size()) {
} else if(m_header_info.compare("USE")==0) {
csv.use_header_info=true;
}
- m_s3_csv_object.set_external_debug_system(fp_debug_mesg);
- m_s3_csv_object.set_result_formatters(fp_s3select_result_format,fp_result_header_format);
+
m_s3_csv_object.set_csv_query(&s3select_syntax, csv);
+
+ m_s3_csv_object.set_external_system_functions(fp_s3select_continue,
+ fp_s3select_result_format,
+ fp_result_header_format,
+ fp_debug_mesg);
+
if (s3select_syntax.get_error_description().empty() == false) {
//error-flow (syntax-error)
- m_aws_response_handler.send_error_response(s3select_syntax_error,
- s3select_syntax.get_error_description().c_str(),
- s3select_resource_id);
+ m_aws_response_handler.send_error_response_rgw_formatter(s3select_syntax_error,s3select_syntax.get_error_description().c_str(),s3select_resource_id);
ldpp_dout(this, 10) << "s3-select query: failed to prase the following query {" << query << "}" << dendl;
ldpp_dout(this, 10) << "s3-select query: syntax-error {" << s3select_syntax.get_error_description() << "}" << dendl;
return -1;
if (status < 0) {
//error flow(processing-time)
- m_aws_response_handler.send_error_response(s3select_processTime_error,
- m_s3_csv_object.get_error_description().c_str(),
- s3select_resource_id);
+ m_aws_response_handler.send_error_response_rgw_formatter(s3select_processTime_error,m_s3_csv_object.get_error_description().c_str(),s3select_resource_id);
+
ldpp_dout(this, 10) << "s3-select query: failed to process query; {" << m_s3_csv_object.get_error_description() << "}" << dendl;
return -1;
}
}
if ((length_post_processing-length_before_processing) != 0) {
ldpp_dout(this, 10) << "s3-select: sql-result-size = " << m_aws_response_handler.get_sql_result().size() << dendl;
- } else {
- m_aws_response_handler.send_continuation_response();
}
ldpp_dout(this, 10) << "s3-select: complete chunk processing : chunk length = " << input_length << dendl;
if (enable_progress == true) {
if (!m_s3_parquet_object.is_set()) {
//parsing the SQL statement.
s3select_syntax.parse_query(m_sql_query.c_str());
- //m_s3_parquet_object.set_external_debug_system(fp_debug_mesg);
+
+ m_s3_parquet_object.set_external_system_functions(fp_s3select_continue,
+ fp_s3select_result_format,
+ fp_result_header_format,
+ fp_debug_mesg);
+
try {
//at this stage the Parquet-processing requires for the meta-data that reside on Parquet object
m_s3_parquet_object.set_parquet_object(std::string("s3object"), &s3select_syntax, &m_rgw_api);
}
if (s3select_syntax.get_error_description().empty() == false) {
//the SQL statement failed the syntax parser
- fp_result_header_format(m_aws_response_handler.get_sql_result());
- m_aws_response_handler.get_sql_result().append(s3select_syntax.get_error_description().data());
- fp_s3select_result_format(m_aws_response_handler.get_sql_result());
+ fp_chunked_transfer_encoding();
+ m_aws_response_handler.send_error_response(m_s3_parquet_object.get_error_description().c_str());
+
ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax.get_error_description() << "}" << dendl;
status = -1;
} else {
fp_result_header_format(m_aws_response_handler.get_sql_result());
//at this stage the Parquet-processing "takes control", it keep calling to s3-range-request according to the SQL statement.
- status = m_s3_parquet_object.run_s3select_on_object(m_aws_response_handler.get_sql_result(), fp_s3select_result_format, fp_result_header_format);
+ status = m_s3_parquet_object.run_s3select_on_object(m_aws_response_handler.get_sql_result());
if (status < 0) {
- m_aws_response_handler.get_sql_result().append(m_s3_parquet_object.get_error_description());
- fp_s3select_result_format(m_aws_response_handler.get_sql_result());
- ldout(s->cct, 10) << "S3select: failure while execution" << m_s3_parquet_object.get_error_description() << dendl;
+
+ fp_chunked_transfer_encoding();
+ m_aws_response_handler.send_error_response(m_s3_parquet_object.get_error_description().c_str());
+
+ return -1;
}
}
#endif
{
int status = 0;
- const char* s3select_processTime_error = "s3select-ProcessingTime-Error";
- const char* s3select_syntax_error = "s3select-Syntax-Error";
- const char* s3select_resource_id = "resourcse-id";
- const char* s3select_json_error = "json-Format-Error";
+ m_s3_csv_object.set_external_system_functions(fp_s3select_continue,
+ fp_s3select_result_format,
+ fp_result_header_format,
+ fp_debug_mesg);
m_aws_response_handler.init_response();
//the JSON data-type should be(currently) only DOCUMENT
if (m_json_datatype.compare("DOCUMENT") != 0) {
const char* s3select_json_error_msg = "s3-select query: wrong json dataType should use DOCUMENT; ";
- m_aws_response_handler.send_error_response(s3select_json_error,
+ m_aws_response_handler.send_error_response_rgw_formatter(s3select_json_error,
s3select_json_error_msg,
s3select_resource_id);
ldpp_dout(this, 10) << s3select_json_error_msg << dendl;
s3select_syntax.parse_query(m_sql_query.c_str());
if (s3select_syntax.get_error_description().empty() == false) {
//SQL statement is wrong(syntax).
- m_aws_response_handler.send_error_response(s3select_syntax_error,
+ m_aws_response_handler.send_error_response_rgw_formatter(s3select_syntax_error,
s3select_syntax.get_error_description().c_str(),
s3select_resource_id);
ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax.get_error_description() << "}" << dendl;
} catch(base_s3select_exception& e) {
ldpp_dout(this, 10) << "S3select: failed to process JSON object: " << e.what() << dendl;
m_aws_response_handler.get_sql_result().append(e.what());
- m_aws_response_handler.send_error_response(s3select_processTime_error,
+ m_aws_response_handler.send_error_response_rgw_formatter(s3select_processTime_error,
e.what(),
s3select_resource_id);
return -EINVAL;
m_aws_response_handler.update_total_bytes_returned(length_post_processing - length_before_processing);
if (status < 0) {
//error flow(processing-time)
- m_aws_response_handler.send_error_response(s3select_processTime_error,
+ m_aws_response_handler.send_error_response_rgw_formatter(s3select_processTime_error,
m_s3_json_object.get_error_description().c_str(),
s3select_resource_id);
ldpp_dout(this, 10) << "s3-select query: failed to process query; {" << m_s3_json_object.get_error_description() << "}" << dendl;
if (length_post_processing-length_before_processing != 0) {
m_aws_response_handler.send_success_response();
- } else {
- m_aws_response_handler.send_continuation_response();
}
if (enable_progress == true) {
m_aws_response_handler.init_progress_response();