using namespace s3selectEngine;
-const char* RGWSelectObj_ObjStore_S3::header_name_str[3] = {":event-type", ":content-type", ":message-type"};
-const char* RGWSelectObj_ObjStore_S3::header_value_str[3] = {"Records", "application/octet-stream", "event"};
-RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3():
- s3select_syntax(std::make_unique<s3selectEngine::s3select>()),
- m_s3_csv_object(std::unique_ptr<s3selectEngine::csv_object>()),
- m_buff_header(std::make_unique<char[]>(1000)),
- chunk_number(0),
- crc32(std::unique_ptr<boost::crc_32_type>())
+std::string &aws_response_handler::get_sql_result()
{
- set_get_data(true);
+ return sql_result;
}
-RGWSelectObj_ObjStore_S3::~RGWSelectObj_ObjStore_S3()
+uint64_t aws_response_handler::get_processed_size()
{
+ return processed_size;
}
-int RGWSelectObj_ObjStore_S3::get_params(optional_yield y)
+void aws_response_handler::set_processed_size(uint64_t value)
{
+ processed_size += value;
+}
- //retrieve s3-select query from payload
- bufferlist data;
- int ret;
- int max_size = 4096;
- std::tie(ret, data) = read_all_input(s, max_size, false);
- if (ret != 0) {
- ldpp_dout(this, 10) << "s3-select query: failed to retrieve query; ret = " << ret << dendl;
- return ret;
- }
+uint64_t aws_response_handler::get_total_bytes_returned()
+{
+ return total_bytes_returned;
+}
- m_s3select_query = data.to_str();
- if (m_s3select_query.length() > 0) {
- ldpp_dout(this, 10) << "s3-select query: " << m_s3select_query << dendl;
- }
- else {
- ldpp_dout(this, 10) << "s3-select query: failed to retrieve query;" << dendl;
- return -1;
- }
+void aws_response_handler::set_total_bytes_returned(uint64_t value)
+{
+ total_bytes_returned += value;
+}
- int status = handle_aws_cli_parameters(m_sql_query);
+void aws_response_handler::push_header(const char *header_name, const char *header_value)
+{
+ char x;
+ short s;
- if (status<0) {
- return status;
- }
+ x = char(strlen(header_name));
+ m_buff_header.append(&x, sizeof(x));
+ m_buff_header.append(header_name);
- return RGWGetObj_ObjStore_S3::get_params(y);
+ x = char(7);
+ m_buff_header.append(&x, sizeof(x));
+
+ s = htons(uint16_t(strlen(header_value)));
+ m_buff_header.append(reinterpret_cast<char *>(&s), sizeof(s));
+ m_buff_header.append(header_value);
}
-void RGWSelectObj_ObjStore_S3::encode_short(char* buff, uint16_t s, int& i)
+int aws_response_handler::create_header_records()
{
- short x = htons(s);
- memcpy(buff, &x, sizeof(s));
- i+=sizeof(s);
+ //headers description(AWS)
+ //[header-name-byte-length:1][header-name:variable-length][header-value-type:1][header-value:variable-length]
+
+ //1
+ push_header(header_name_str[EVENT_TYPE], header_value_str[RECORDS]);
+ //2
+ push_header(header_name_str[CONTENT_TYPE], header_value_str[OCTET_STREAM]);
+ //3
+ push_header(header_name_str[MESSAGE_TYPE], header_value_str[EVENT]);
+
+ return m_buff_header.size();
}
-void RGWSelectObj_ObjStore_S3::encode_int(char* buff, u_int32_t s, int& i)
+int aws_response_handler::create_header_continuation()
{
- u_int32_t x = htonl(s);
- memcpy(buff, &x, sizeof(s));
- i+=sizeof(s);
+ //headers description(AWS)
+ //1
+ push_header(header_name_str[EVENT_TYPE], header_value_str[CONT]);
+ //2
+ push_header(header_name_str[MESSAGE_TYPE], header_value_str[EVENT]);
+
+ return m_buff_header.size();
}
-int RGWSelectObj_ObjStore_S3::create_header_records(char* buff)
+int aws_response_handler::create_header_progress()
{
- int i = 0;
+ //headers description(AWS)
+ //1
+ push_header(header_name_str[EVENT_TYPE], header_value_str[PROGRESS]);
+ //2
+ push_header(header_name_str[CONTENT_TYPE], header_value_str[XML]);
+ //3
+ push_header(header_name_str[MESSAGE_TYPE], header_value_str[EVENT]);
+
+ return m_buff_header.size();
+}
+int aws_response_handler::create_header_stats()
+{
//headers description(AWS)
- //[header-name-byte-length:1][header-name:variable-length][header-value-type:1][header-value:variable-length]
-
//1
- buff[i++] = char(strlen(header_name_str[EVENT_TYPE]));
- memcpy(&buff[i], header_name_str[EVENT_TYPE], strlen(header_name_str[EVENT_TYPE]));
- i += strlen(header_name_str[EVENT_TYPE]);
- buff[i++] = char(7);
- encode_short(&buff[i], uint16_t(strlen(header_value_str[RECORDS])), i);
- memcpy(&buff[i], header_value_str[RECORDS], strlen(header_value_str[RECORDS]));
- i += strlen(header_value_str[RECORDS]);
+ push_header(header_name_str[EVENT_TYPE], header_value_str[STATS]);
+ //2
+ push_header(header_name_str[CONTENT_TYPE], header_value_str[XML]);
+ //3
+ push_header(header_name_str[MESSAGE_TYPE], header_value_str[EVENT]);
+
+ return m_buff_header.size();
+}
+int aws_response_handler::create_header_end()
+{
+ //headers description(AWS)
+ //1
+ push_header(header_name_str[EVENT_TYPE], header_value_str[END]);
//2
- buff[i++] = char(strlen(header_name_str[CONTENT_TYPE]));
- memcpy(&buff[i], header_name_str[CONTENT_TYPE], strlen(header_name_str[CONTENT_TYPE]));
- i += strlen(header_name_str[CONTENT_TYPE]);
- buff[i++] = char(7);
- encode_short(&buff[i], uint16_t(strlen(header_value_str[OCTET_STREAM])), i);
- memcpy(&buff[i], header_value_str[OCTET_STREAM], strlen(header_value_str[OCTET_STREAM]));
- i += strlen(header_value_str[OCTET_STREAM]);
+ push_header(header_name_str[MESSAGE_TYPE], header_value_str[EVENT]);
+
+ return m_buff_header.size();
+}
+
+int aws_response_handler::create_error_header_records(const char *error_message)
+{
+ //headers description(AWS)
+ //[header-name-byte-length:1][header-name:variable-length][header-value-type:1][header-value:variable-length]
+ //1
+ push_header(header_name_str[ERROR_CODE], header_value_str[ENGINE_ERROR]);
+ //2
+ push_header(header_name_str[ERROR_MESSAGE], error_message);
//3
- buff[i++] = char(strlen(header_name_str[MESSAGE_TYPE]));
- memcpy(&buff[i], header_name_str[MESSAGE_TYPE], strlen(header_name_str[MESSAGE_TYPE]));
- i += strlen(header_name_str[MESSAGE_TYPE]);
- buff[i++] = char(7);
- encode_short(&buff[i], uint16_t(strlen(header_value_str[EVENT])), i);
- memcpy(&buff[i], header_value_str[EVENT], strlen(header_value_str[EVENT]));
- i += strlen(header_value_str[EVENT]);
+ push_header(header_name_str[MESSAGE_TYPE], header_value_str[ERROR_TYPE]);
- return i;
+ return m_buff_header.size();
}
-int RGWSelectObj_ObjStore_S3::create_message(std::string &out_string, u_int32_t result_len, u_int32_t header_len)
+int aws_response_handler::create_message(u_int32_t header_len)
{
- //message description(AWS):
+ //message description(AWS):
//[total-byte-length:4][header-byte-length:4][crc:4][headers:variable-length][payload:variable-length][crc:4]
- //s3select result is produced into m_result, the m_result is also the response-message, thus the attach headers and CRC
+ //s3select result is produced into sql_result, the sql_result is also the response-message, thus the attach headers and CRC
//are created later to the produced SQL result, and actually wrapping the payload.
+ auto push_encode_int = [&](u_int32_t s, int pos) {
+ u_int32_t x = htonl(s);
+ sql_result.replace(pos, sizeof(x), reinterpret_cast<char *>(&x), sizeof(x));
+ };
+
u_int32_t total_byte_len = 0;
u_int32_t preload_crc = 0;
u_int32_t message_crc = 0;
- int i = 0;
- char * buff = out_string.data();
- if(crc32 ==0) {
- // the parameters are according to CRC-32 algorithm and its aligned with AWS-cli checksum
- crc32 = std::unique_ptr<boost::crc_32_type>(new boost::crc_optimal<32, 0x04C11DB7, 0xFFFFFFFF, 0xFFFFFFFF, true, true>);
- }
+ total_byte_len = sql_result.size() + 4; //the total is greater in 4 bytes than current size
- total_byte_len = result_len + 16;//the total is greater in 4 bytes than current size
+ push_encode_int(total_byte_len, 0);
+ push_encode_int(header_len, 4);
- encode_int(&buff[i], total_byte_len, i);//store sizes at the beginning of the buffer
- encode_int(&buff[i], header_len, i);
+ crc32.reset();
+ crc32 = std::for_each(sql_result.data(), sql_result.data() + 8, crc32); //crc for starting 8 bytes
+ preload_crc = crc32();
+ push_encode_int(preload_crc, 8);
- crc32->reset();
- *crc32 = std::for_each( buff, buff + 8, *crc32 );//crc for starting 8 bytes
- preload_crc = (*crc32)();
- encode_int(&buff[i], preload_crc, i);
+ crc32.reset();
+ crc32 = std::for_each(sql_result.begin(), sql_result.end(), crc32); //crc for payload + checksum
+ message_crc = crc32();
- i += result_len;//advance to the end of payload.
+ u_int32_t x = htonl(message_crc);
+ sql_result.append(reinterpret_cast<char *>(&x), sizeof(x));
- crc32->reset();
- *crc32 = std::for_each( buff, buff + i, *crc32 );//crc for payload + checksum
- message_crc = (*crc32)();
- char out_encode[4];
- encode_int(out_encode, message_crc, i);
- out_string.append(out_encode,sizeof(out_encode));
+ return sql_result.size();
+}
+
+void aws_response_handler::init_response()
+{ //12 positions for header-crc
+ sql_result.resize(header_crc_size,'\0');
+}
+
+void aws_response_handler::init_success_response()
+{
+ m_buff_header.clear();
+ header_size = create_header_records();
+ sql_result.append(m_buff_header.c_str(), header_size);
+ sql_result.append(PAYLOAD_LINE);
+}
+
+void aws_response_handler::send_continuation_response()
+{
+ sql_result.resize(header_crc_size,'\0');
+ m_buff_header.clear();
+ header_size = create_header_continuation();
+ sql_result.append(m_buff_header.c_str(), header_size);
+ int buff_len = create_message(header_size);
+ s->formatter->write_bin_data(sql_result.data(), buff_len);
+ rgw_flush_formatter_and_reset(s, s->formatter);
+}
- return i;
+void aws_response_handler::init_progress_response()
+{
+ sql_result.resize(header_crc_size,'\0');
+ m_buff_header.clear();
+ header_size = create_header_progress();
+ sql_result.append(m_buff_header.c_str(), header_size);
}
-#define PAYLOAD_LINE "\n<Payload>\n<Records>\n<Payload>\n"
-#define END_PAYLOAD_LINE "\n</Payload></Records></Payload>"
+void aws_response_handler::init_stats_response()
+{
+ sql_result.resize(header_crc_size,'\0');
+ m_buff_header.clear();
+ header_size = create_header_stats();
+ sql_result.append(m_buff_header.c_str(), header_size);
+}
+
+void aws_response_handler::init_end_response()
+{
+ sql_result.resize(header_crc_size,'\0');
+ m_buff_header.clear();
+ header_size = create_header_end();
+ sql_result.append(m_buff_header.c_str(), header_size);
+ int buff_len = create_message(header_size);
+ s->formatter->write_bin_data(sql_result.data(), buff_len);
+ rgw_flush_formatter_and_reset(s, s->formatter);
+}
+
+void aws_response_handler::init_error_response(const char *error_message)
+{ //currently not in use. the headers in the case of error, are not extracted by AWS-cli.
+ m_buff_header.clear();
+ header_size = create_error_header_records(error_message);
+ sql_result.append(m_buff_header.c_str(), header_size);
+}
+
+void aws_response_handler::send_success_response()
+{
+ sql_result.append(END_PAYLOAD_LINE);
+ int buff_len = create_message(header_size);
+ s->formatter->write_bin_data(sql_result.data(), buff_len);
+ rgw_flush_formatter_and_reset(s, s->formatter);
+}
+
+void aws_response_handler::send_error_response(const char *error_code,
+ const char *error_message,
+ const char *resource_id)
+{
+
+ set_req_state_err(s, 0); //TODO what err_no?
+ dump_errno(s, 400);
+ end_header(s, m_rgwop, "application/xml", CHUNKED_TRANSFER_ENCODING);
+ dump_start(s);
+
+ s->formatter->open_object_section("Error");
+
+ s->formatter->dump_string("Code", error_code);
+ s->formatter->dump_string("Message", error_message);
+ s->formatter->dump_string("Resource", "#Resource#");
+ s->formatter->dump_string("RequestId", resource_id);
+
+ s->formatter->close_section();
+
+ rgw_flush_formatter_and_reset(s, s->formatter);
+}
+
+void aws_response_handler::send_progress_response()
+{
+ std::string progress_payload="<?xml version=\"1.0\" encoding=\"UTF-8\"?><Progress><BytesScanned>" + to_string(get_processed_size()) +
+ "</BytesScanned><BytesProcessed>" + to_string(get_processed_size()) + "</BytesProcessed>" +
+ "<BytesReturned>" + to_string(get_total_bytes_returned()) + "</BytesReturned></Progress>";
+
+ sql_result.append(progress_payload);
+ int buff_len = create_message(header_size);
+
+ s->formatter->write_bin_data(sql_result.data(), buff_len);
+ rgw_flush_formatter_and_reset(s, s->formatter);
+}
+
+void aws_response_handler::send_stats_response()
+{
+ std::string stats_payload="<?xml version=\"1.0\" encoding=\"UTF-8\"?><Stats><BytesScanned>" + to_string(get_processed_size()) +
+ "</BytesScanned><BytesProcessed>" + to_string(get_processed_size()) + "</BytesProcessed>" +
+ "<BytesReturned>" + to_string(get_total_bytes_returned()) + "</BytesReturned></Stats>";
+
+ sql_result.append(stats_payload);
+ int buff_len = create_message(header_size);
+
+ s->formatter->write_bin_data(sql_result.data(), buff_len);
+ rgw_flush_formatter_and_reset(s, s->formatter);
+}
+
+RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3() : s3select_syntax(std::make_unique<s3selectEngine::s3select>()),
+ m_s3_csv_object(std::unique_ptr<s3selectEngine::csv_object>()),
+ chunk_number(0)
+{
+ set_get_data(true);
+}
+
+RGWSelectObj_ObjStore_S3::~RGWSelectObj_ObjStore_S3()
+{
+}
+
+int RGWSelectObj_ObjStore_S3::get_params(optional_yield y)
+{
+
+ //retrieve s3-select query from payload
+ bufferlist data;
+ int ret;
+ int max_size = 4096;
+ std::tie(ret, data) = read_all_input(s, max_size, false);
+ if (ret != 0) {
+ ldpp_dout(this, 10) << "s3-select query: failed to retrieve query; ret = " << ret << dendl;
+ return ret;
+ }
+
+ m_s3select_query = data.to_str();
+ if (m_s3select_query.length() > 0) {
+ ldpp_dout(this, 10) << "s3-select query: " << m_s3select_query << dendl;
+ }
+ else {
+ ldpp_dout(this, 10) << "s3-select query: failed to retrieve query;" << dendl;
+ return -1;
+ }
+
+ int status = handle_aws_cli_parameters(m_sql_query);
+
+ if (status<0) {
+ return status;
+ }
+
+ return RGWGetObj_ObjStore_S3::get_params(y);
+}
int RGWSelectObj_ObjStore_S3::run_s3select(const char* query, const char* input, size_t input_length)
{
int status = 0;
+ uint32_t length_before_processing, length_post_processing;
csv_object::csv_defintions csv;
-
- m_result = "012345678901"; //12 positions for header-crc
-
- int header_size = 0;
+ const char* s3select_syntax_error = "s3select-Syntax-Error";
+ const char* s3select_resource_id = "resourcse-id";
+ const char* s3select_processTime_error = "s3select-ProcessingTime-Error";
if (m_s3_csv_object==0) {
s3select_syntax->parse_query(query);
csv.escape_char = *m_escape_char.c_str();
}
+ if (m_enable_progress.compare("true")==0) {
+ enable_progress = true;
+ } else {
+ enable_progress = false;
+ }
+
+ if (output_row_delimiter.size()) {
+ csv.output_row_delimiter = *output_row_delimiter.c_str();
+ }
+
+ if (output_column_delimiter.size()) {
+ csv.output_column_delimiter = *output_column_delimiter.c_str();
+ }
+
+ if (output_quot.size()) {
+ csv.output_quot_char = *output_quot.c_str();
+ }
+
+ if (output_escape_char.size()) {
+ csv.output_escape_char = *output_escape_char.c_str();
+ }
+
+ if(output_quote_fields.compare("ALWAYS") == 0) {
+ csv.quote_fields_always = true;
+ }
+ else if(output_quote_fields.compare("ASNEEDED") == 0) {
+ csv.quote_fields_asneeded = true;
+ }
+
if(m_header_info.compare("IGNORE")==0) {
csv.ignore_header_info=true;
}
else if(m_header_info.compare("USE")==0) {
csv.use_header_info=true;
}
-
m_s3_csv_object = std::unique_ptr<s3selectEngine::csv_object>(new s3selectEngine::csv_object(s3select_syntax.get(), csv));
}
- header_size = create_header_records(m_buff_header.get());
- m_result.append(m_buff_header.get(), header_size);
- m_result.append(PAYLOAD_LINE);
+ m_aws_response_handler->init_response();
+
+ if (s3select_syntax->get_error_description().empty() == false)
+ { //error-flow (syntax-error)
+ m_aws_response_handler->send_error_response(s3select_syntax_error,
+ s3select_syntax->get_error_description().c_str(),
+ s3select_resource_id);
- if (s3select_syntax->get_error_description().empty() == false) {
- m_result.append(s3select_syntax->get_error_description());
- ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax->get_error_description() << "}"<< dendl;
- status = -1;
+ ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax->get_error_description() << "}" << dendl;
+ return -1;
}
- else {
- status = m_s3_csv_object->run_s3select_on_stream(m_result, input, input_length, s->obj_size);
- if(status<0) {
- m_result.append(m_s3_csv_object->get_error_description());
+ else
+ {
+ if (input == nullptr) {
+ input = "";
}
- }
+ m_aws_response_handler->init_success_response();
+ length_before_processing = (m_aws_response_handler->get_sql_result()).size();
+
+ //query is correct(syntax), processing is starting.
+ status = m_s3_csv_object->run_s3select_on_stream(m_aws_response_handler->get_sql_result(), input, input_length, s->obj_size);
+ length_post_processing = (m_aws_response_handler->get_sql_result()).size();
+ m_aws_response_handler->set_total_bytes_returned(length_post_processing-length_before_processing);
+ if (status < 0)
+ { //error flow(processing-time)
+ m_aws_response_handler->send_error_response(s3select_processTime_error,
+ m_s3_csv_object->get_error_description().c_str(),
+ s3select_resource_id);
- if (m_result.size() > strlen(PAYLOAD_LINE)) {
- m_result.append(END_PAYLOAD_LINE);
- int buff_len = create_message(m_result, m_result.size() - 12, header_size);
- s->formatter->write_bin_data(m_result.data(), buff_len);
- if (op_ret < 0) {
- return op_ret;
+ ldpp_dout(this, 10) << "s3-select query: failed to process query; {" << m_s3_csv_object->get_error_description() << "}" << dendl;
+ return -1;
}
+
+ if (chunk_number == 0)
+ {//success flow
+ if (op_ret < 0)
+ {
+ set_req_state_err(s, op_ret);
+ }
+ dump_errno(s);
+ // Explicitly use chunked transfer encoding so that we can stream the result
+ // to the user without having to wait for the full length of it.
+ end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING);
+ }
+ chunk_number++;
+ }
+
+ if (length_post_processing-length_before_processing != 0) {
+ m_aws_response_handler->send_success_response();
+ } else {
+ m_aws_response_handler->send_continuation_response();
+ }
+
+ if (enable_progress == true) {
+ m_aws_response_handler->init_progress_response();
+ m_aws_response_handler->send_progress_response();
}
- rgw_flush_formatter_and_reset(s, s->formatter);
return status;
}
int RGWSelectObj_ObjStore_S3::handle_aws_cli_parameters(std::string& sql_query)
{
+ std::string input_tag{"InputSerialization"};
+ std::string output_tag{"OutputSerialization"};
if(chunk_number !=0) {
return 0;
}
//AWS cli s3select parameters
- extract_by_tag("Expression", sql_query);
- extract_by_tag("FieldDelimiter", m_column_delimiter);
- extract_by_tag("QuoteCharacter", m_quot);
- extract_by_tag("RecordDelimiter", m_row_delimiter);
+ extract_by_tag(m_s3select_query, "Expression", sql_query);
+ extract_by_tag(m_s3select_query, "Enabled", m_enable_progress);
+
+ size_t _qi = m_s3select_query.find("<" + input_tag + ">", 0);
+ size_t _qe = m_s3select_query.find("</" + input_tag + ">", _qi);
+ m_s3select_input = m_s3select_query.substr(_qi + input_tag.size() + 2, _qe - (_qi + input_tag.size() + 2));
+
+ extract_by_tag(m_s3select_input,"FieldDelimiter", m_column_delimiter);
+ extract_by_tag(m_s3select_input, "QuoteCharacter", m_quot);
+ extract_by_tag(m_s3select_input, "RecordDelimiter", m_row_delimiter);
+ extract_by_tag(m_s3select_input, "FileHeaderInfo", m_header_info);
+
if (m_row_delimiter.size()==0) {
m_row_delimiter='\n';
}
+ else if(m_row_delimiter.compare(" ") == 0)
+ {//presto change
+ m_row_delimiter='\n';
+ }
+
+ extract_by_tag(m_s3select_input, "QuoteEscapeCharacter", m_escape_char);
+ extract_by_tag(m_s3select_input, "CompressionType", m_compression_type);
+
+ size_t _qo = m_s3select_query.find("<" + output_tag + ">", 0);
+ size_t _qs = m_s3select_query.find("</" + output_tag + ">", _qi);
+ m_s3select_output = m_s3select_query.substr(_qo + output_tag.size() + 2, _qs - (_qo + output_tag.size() + 2));
+
+ extract_by_tag(m_s3select_output, "FieldDelimiter", output_column_delimiter);
+ extract_by_tag(m_s3select_output, "QuoteCharacter", output_quot);
+ extract_by_tag(m_s3select_output, "QuoteEscapeCharacter", output_escape_char);
+ extract_by_tag(m_s3select_output, "QuoteFields", output_quote_fields);
+ extract_by_tag(m_s3select_output, "RecordDelimiter", output_row_delimiter);
- extract_by_tag("QuoteEscapeCharacter", m_escape_char);
- extract_by_tag("CompressionType", m_compression_type);
if (m_compression_type.length()>0 && m_compression_type.compare("NONE") != 0) {
ldpp_dout(this, 10) << "RGW supports currently only NONE option for compression type" << dendl;
return -1;
}
- extract_by_tag("FileHeaderInfo", m_header_info);
-
return 0;
}
-int RGWSelectObj_ObjStore_S3::extract_by_tag(std::string tag_name, std::string& result)
+int RGWSelectObj_ObjStore_S3::extract_by_tag(std::string input, std::string tag_name, std::string& result)
{
result = "";
- size_t _qs = m_s3select_query.find("<" + tag_name + ">", 0) + tag_name.size() + 2;
+ size_t _qs = input.find("<" + tag_name + ">", 0);
+ size_t qs_input = _qs + tag_name.size() + 2;
if (_qs == std::string::npos) {
return -1;
}
- size_t _qe = m_s3select_query.find("</" + tag_name + ">", _qs);
+ size_t _qe = input.find("</" + tag_name + ">", qs_input);
if (_qe == std::string::npos) {
return -1;
}
- result = m_s3select_query.substr(_qs, _qe - _qs);
+ result = input.substr(qs_input, _qe - qs_input);
return 0;
}
int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_t len)
{
- if (len == 0) {
- return 0;
+ int status=0;
+ if (m_aws_response_handler == nullptr) {
+ m_aws_response_handler = std::make_unique<aws_response_handler>(s,this);
}
-
- if (chunk_number == 0) {
- if (op_ret < 0) {
- set_req_state_err(s, op_ret);
- }
- dump_errno(s);
+
+ if(len == 0 && s->obj_size != 0) {
+ return 0;
}
+
+ if (s->obj_size == 0) {
+ status = run_s3select(m_sql_query.c_str(), nullptr, 0);
+ } else {
auto bl_len = bl.get_num_buffers();
- // Explicitly use chunked transfer encoding so that we can stream the result
- // to the user without having to wait for the full length of it.
- if (chunk_number == 0) {
- end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING);
- }
-
- int status=0;
int i=0;
for(auto& it : bl.buffers()) {
<< " obj-size " << s->obj_size << dendl;
continue;
}
+
+ m_aws_response_handler->set_processed_size(it.length());
status = run_s3select(m_sql_query.c_str(), &(it)[0], it.length());
if(status<0) {
break;
- }
+ }
i++;
}
+ }
- chunk_number++;
-
+ if (m_aws_response_handler->get_processed_size() == s->obj_size) {
+ if (status >=0) {
+ m_aws_response_handler->init_stats_response();
+ m_aws_response_handler->send_stats_response();
+ m_aws_response_handler->init_end_response();
+ }
+ }
return status;
}
class csv_object;
}
+
+class aws_response_handler
+{//TODO this class should reside on s3select submodule
+
+private:
+ std::string sql_result;
+ struct req_state *s;//TODO will be replace by callback
+ uint32_t header_size;
+ // the parameters are according to CRC-32 algorithm and its aligned with AWS-cli checksum
+ boost::crc_optimal<32, 0x04C11DB7, 0xFFFFFFFF, 0xFFFFFFFF, true, true> crc32;
+ RGWOp *m_rgwop;
+ std::string m_buff_header;
+ uint64_t total_bytes_returned;
+ uint64_t processed_size;
+
+ enum header_name_En
+ {
+ EVENT_TYPE,
+ CONTENT_TYPE,
+ MESSAGE_TYPE,
+ ERROR_CODE,
+ ERROR_MESSAGE
+ };
+
+ enum header_value_En
+ {
+ RECORDS,
+ OCTET_STREAM,
+ EVENT,
+ CONT,
+ PROGRESS,
+ END,
+ XML,
+ STATS,
+ ENGINE_ERROR,
+ ERROR_TYPE
+ };
+
+ const char *PAYLOAD_LINE= "\n<Payload>\n<Records>\n<Payload>\n";
+ const char *END_PAYLOAD_LINE= "\n</Payload></Records></Payload>";
+ const char *header_name_str[5] = {":event-type", ":content-type", ":message-type",":error-code",":error-message"};
+ const char *header_value_str[10] = {"Records", "application/octet-stream", "event", "Cont", "Progress", "End", "text/xml", "Stats", "s3select-engine-error","error"};
+ static constexpr size_t header_crc_size = 12;
+
+ void push_header(const char * header_name,const char* header_value);
+
+ int create_message(u_int32_t header_len);
+
+public:
+ //12 positions for header-crc
+ aws_response_handler(struct req_state *ps,RGWOp *rgwop) : sql_result("012345678901"), s(ps),m_rgwop(rgwop),total_bytes_returned{0},processed_size{0}
+ { }
+
+ std::string &get_sql_result();
+
+ uint64_t get_processed_size();
+
+ void set_processed_size(uint64_t value);
+
+ uint64_t get_total_bytes_returned();
+
+ void set_total_bytes_returned(uint64_t value);
+
+ int create_header_records();
+
+ int create_header_continuation();
+
+ int create_header_progress();
+
+ int create_header_stats();
+
+ int create_header_end();
+
+ int create_error_header_records(const char* error_message);
+
+ void init_response();
+
+ void init_success_response();
+
+ void send_continuation_response();
+
+ void init_progress_response();
+
+ void init_end_response();
+
+ void init_stats_response();
+
+ void init_error_response(const char* error_message);
+
+ void send_success_response();
+
+ void send_progress_response();
+
+ void send_stats_response();
+
+ void send_error_response(const char* error_code,
+ const char* error_message,
+ const char* resource_id);
+
+}; //end class aws_response_handler
+
class RGWSelectObj_ObjStore_S3 : public RGWGetObj_ObjStore_S3
{
private:
std::unique_ptr<s3selectEngine::s3select> s3select_syntax;
std::string m_s3select_query;
- std::string m_result;
+ std::string m_s3select_input;
+ std::string m_s3select_output;
std::unique_ptr<s3selectEngine::csv_object> m_s3_csv_object;
std::string m_column_delimiter;
std::string m_quot;
std::string m_row_delimiter;
std::string m_compression_type;
std::string m_escape_char;
- std::unique_ptr<char[]> m_buff_header;
std::string m_header_info;
std::string m_sql_query;
+ std::string m_enable_progress;
+ std::string output_column_delimiter;
+ std::string output_quot;
+ std::string output_escape_char;
+ std::string output_quote_fields;
+ std::string output_row_delimiter;
+
+ std::unique_ptr<aws_response_handler> m_aws_response_handler;
+ bool enable_progress;
public:
unsigned int chunk_number;
- enum header_name_En
- {
- EVENT_TYPE,
- CONTENT_TYPE,
- MESSAGE_TYPE
- };
- static const char* header_name_str[3];
-
- enum header_value_En
- {
- RECORDS,
- OCTET_STREAM,
- EVENT
- };
- static const char* header_value_str[3];
-
RGWSelectObj_ObjStore_S3();
virtual ~RGWSelectObj_ObjStore_S3();
virtual int get_params(optional_yield y) override;
private:
- void encode_short(char* buff, uint16_t s, int& i);
-
- void encode_int(char* buff, u_int32_t s, int& i);
-
- int create_header_records(char* buff);
-
- std::unique_ptr<boost::crc_32_type> crc32;
-
- int create_message(std::string&, u_int32_t result_len, u_int32_t header_len);
int run_s3select(const char* query, const char* input, size_t input_length);
- int extract_by_tag(std::string tag_name, std::string& result);
+ int extract_by_tag(std::string input, std::string tag_name, std::string& result);
void convert_escape_seq(std::string& esc);