From: gal salomon Date: Fri, 7 May 2021 21:29:13 +0000 (+0300) Subject: RGW: Implement continuation, progress, stats, end s3select response X-Git-Tag: v17.1.0~207^2~8 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=f128f04c60ef2c14f5812153bf2971df9dca15c2;p=ceph.git RGW: Implement continuation, progress, stats, end s3select response RGW/S3select: Implement output-serializationi. user may request different CSV defintions for output (field delimiter, row delimiter, quote handling. RGW/S3select: Implement presto-alignments. presto-application sends queries with table-alias,case insensitive, and with no-semicolon at the end of statement. RGW/s3select: zero object size issue Signed-off-by: Albin Antony Signed-off-by: galsalomon66 --- diff --git a/qa/suites/rgw/verify/tasks/s3tests.yaml b/qa/suites/rgw/verify/tasks/s3tests.yaml index b5aef9b044b83..b9934fea37bbf 100644 --- a/qa/suites/rgw/verify/tasks/s3tests.yaml +++ b/qa/suites/rgw/verify/tasks/s3tests.yaml @@ -1,5 +1,6 @@ tasks: - s3tests: client.0: - force-branch: ceph-master + force-branch: progress-stats + git_remote: https://github.com/galsalomon66/ rgw_server: client.0 diff --git a/src/rgw/rgw_rest_s3.cc b/src/rgw/rgw_rest_s3.cc index 4630652487393..426a23324bc21 100644 --- a/src/rgw/rgw_rest_s3.cc +++ b/src/rgw/rgw_rest_s3.cc @@ -6128,156 +6128,327 @@ bool rgw::auth::s3::S3AnonymousEngine::is_applicable( using namespace s3selectEngine; -const char* RGWSelectObj_ObjStore_S3::header_name_str[3] = {":event-type", ":content-type", ":message-type"}; -const char* RGWSelectObj_ObjStore_S3::header_value_str[3] = {"Records", "application/octet-stream", "event"}; -RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3(): - s3select_syntax(std::make_unique()), - m_s3_csv_object(std::unique_ptr()), - m_buff_header(std::make_unique(1000)), - chunk_number(0), - crc32(std::unique_ptr()) +std::string &aws_response_handler::get_sql_result() { - set_get_data(true); + return sql_result; } -RGWSelectObj_ObjStore_S3::~RGWSelectObj_ObjStore_S3() +uint64_t aws_response_handler::get_processed_size() { + return processed_size; } -int RGWSelectObj_ObjStore_S3::get_params(optional_yield y) +void aws_response_handler::set_processed_size(uint64_t value) { + processed_size += value; +} - //retrieve s3-select query from payload - bufferlist data; - int ret; - int max_size = 4096; - std::tie(ret, data) = read_all_input(s, max_size, false); - if (ret != 0) { - ldpp_dout(this, 10) << "s3-select query: failed to retrieve query; ret = " << ret << dendl; - return ret; - } +uint64_t aws_response_handler::get_total_bytes_returned() +{ + return total_bytes_returned; +} - m_s3select_query = data.to_str(); - if (m_s3select_query.length() > 0) { - ldpp_dout(this, 10) << "s3-select query: " << m_s3select_query << dendl; - } - else { - ldpp_dout(this, 10) << "s3-select query: failed to retrieve query;" << dendl; - return -1; - } +void aws_response_handler::set_total_bytes_returned(uint64_t value) +{ + total_bytes_returned += value; +} - int status = handle_aws_cli_parameters(m_sql_query); +void aws_response_handler::push_header(const char *header_name, const char *header_value) +{ + char x; + short s; - if (status<0) { - return status; - } + x = char(strlen(header_name)); + m_buff_header.append(&x, sizeof(x)); + m_buff_header.append(header_name); - return RGWGetObj_ObjStore_S3::get_params(y); + x = char(7); + m_buff_header.append(&x, sizeof(x)); + + s = htons(uint16_t(strlen(header_value))); + m_buff_header.append(reinterpret_cast(&s), sizeof(s)); + m_buff_header.append(header_value); } -void RGWSelectObj_ObjStore_S3::encode_short(char* buff, uint16_t s, int& i) +int aws_response_handler::create_header_records() { - short x = htons(s); - memcpy(buff, &x, sizeof(s)); - i+=sizeof(s); + //headers description(AWS) + //[header-name-byte-length:1][header-name:variable-length][header-value-type:1][header-value:variable-length] + + //1 + push_header(header_name_str[EVENT_TYPE], header_value_str[RECORDS]); + //2 + push_header(header_name_str[CONTENT_TYPE], header_value_str[OCTET_STREAM]); + //3 + push_header(header_name_str[MESSAGE_TYPE], header_value_str[EVENT]); + + return m_buff_header.size(); } -void RGWSelectObj_ObjStore_S3::encode_int(char* buff, u_int32_t s, int& i) +int aws_response_handler::create_header_continuation() { - u_int32_t x = htonl(s); - memcpy(buff, &x, sizeof(s)); - i+=sizeof(s); + //headers description(AWS) + //1 + push_header(header_name_str[EVENT_TYPE], header_value_str[CONT]); + //2 + push_header(header_name_str[MESSAGE_TYPE], header_value_str[EVENT]); + + return m_buff_header.size(); } -int RGWSelectObj_ObjStore_S3::create_header_records(char* buff) +int aws_response_handler::create_header_progress() { - int i = 0; + //headers description(AWS) + //1 + push_header(header_name_str[EVENT_TYPE], header_value_str[PROGRESS]); + //2 + push_header(header_name_str[CONTENT_TYPE], header_value_str[XML]); + //3 + push_header(header_name_str[MESSAGE_TYPE], header_value_str[EVENT]); + + return m_buff_header.size(); +} +int aws_response_handler::create_header_stats() +{ //headers description(AWS) - //[header-name-byte-length:1][header-name:variable-length][header-value-type:1][header-value:variable-length] - //1 - buff[i++] = char(strlen(header_name_str[EVENT_TYPE])); - memcpy(&buff[i], header_name_str[EVENT_TYPE], strlen(header_name_str[EVENT_TYPE])); - i += strlen(header_name_str[EVENT_TYPE]); - buff[i++] = char(7); - encode_short(&buff[i], uint16_t(strlen(header_value_str[RECORDS])), i); - memcpy(&buff[i], header_value_str[RECORDS], strlen(header_value_str[RECORDS])); - i += strlen(header_value_str[RECORDS]); + push_header(header_name_str[EVENT_TYPE], header_value_str[STATS]); + //2 + push_header(header_name_str[CONTENT_TYPE], header_value_str[XML]); + //3 + push_header(header_name_str[MESSAGE_TYPE], header_value_str[EVENT]); + + return m_buff_header.size(); +} +int aws_response_handler::create_header_end() +{ + //headers description(AWS) + //1 + push_header(header_name_str[EVENT_TYPE], header_value_str[END]); //2 - buff[i++] = char(strlen(header_name_str[CONTENT_TYPE])); - memcpy(&buff[i], header_name_str[CONTENT_TYPE], strlen(header_name_str[CONTENT_TYPE])); - i += strlen(header_name_str[CONTENT_TYPE]); - buff[i++] = char(7); - encode_short(&buff[i], uint16_t(strlen(header_value_str[OCTET_STREAM])), i); - memcpy(&buff[i], header_value_str[OCTET_STREAM], strlen(header_value_str[OCTET_STREAM])); - i += strlen(header_value_str[OCTET_STREAM]); + push_header(header_name_str[MESSAGE_TYPE], header_value_str[EVENT]); + + return m_buff_header.size(); +} + +int aws_response_handler::create_error_header_records(const char *error_message) +{ + //headers description(AWS) + //[header-name-byte-length:1][header-name:variable-length][header-value-type:1][header-value:variable-length] + //1 + push_header(header_name_str[ERROR_CODE], header_value_str[ENGINE_ERROR]); + //2 + push_header(header_name_str[ERROR_MESSAGE], error_message); //3 - buff[i++] = char(strlen(header_name_str[MESSAGE_TYPE])); - memcpy(&buff[i], header_name_str[MESSAGE_TYPE], strlen(header_name_str[MESSAGE_TYPE])); - i += strlen(header_name_str[MESSAGE_TYPE]); - buff[i++] = char(7); - encode_short(&buff[i], uint16_t(strlen(header_value_str[EVENT])), i); - memcpy(&buff[i], header_value_str[EVENT], strlen(header_value_str[EVENT])); - i += strlen(header_value_str[EVENT]); + push_header(header_name_str[MESSAGE_TYPE], header_value_str[ERROR_TYPE]); - return i; + return m_buff_header.size(); } -int RGWSelectObj_ObjStore_S3::create_message(std::string &out_string, u_int32_t result_len, u_int32_t header_len) +int aws_response_handler::create_message(u_int32_t header_len) { - //message description(AWS): + //message description(AWS): //[total-byte-length:4][header-byte-length:4][crc:4][headers:variable-length][payload:variable-length][crc:4] - //s3select result is produced into m_result, the m_result is also the response-message, thus the attach headers and CRC + //s3select result is produced into sql_result, the sql_result is also the response-message, thus the attach headers and CRC //are created later to the produced SQL result, and actually wrapping the payload. + auto push_encode_int = [&](u_int32_t s, int pos) { + u_int32_t x = htonl(s); + sql_result.replace(pos, sizeof(x), reinterpret_cast(&x), sizeof(x)); + }; + u_int32_t total_byte_len = 0; u_int32_t preload_crc = 0; u_int32_t message_crc = 0; - int i = 0; - char * buff = out_string.data(); - if(crc32 ==0) { - // the parameters are according to CRC-32 algorithm and its aligned with AWS-cli checksum - crc32 = std::unique_ptr(new boost::crc_optimal<32, 0x04C11DB7, 0xFFFFFFFF, 0xFFFFFFFF, true, true>); - } + total_byte_len = sql_result.size() + 4; //the total is greater in 4 bytes than current size - total_byte_len = result_len + 16;//the total is greater in 4 bytes than current size + push_encode_int(total_byte_len, 0); + push_encode_int(header_len, 4); - encode_int(&buff[i], total_byte_len, i);//store sizes at the beginning of the buffer - encode_int(&buff[i], header_len, i); + crc32.reset(); + crc32 = std::for_each(sql_result.data(), sql_result.data() + 8, crc32); //crc for starting 8 bytes + preload_crc = crc32(); + push_encode_int(preload_crc, 8); - crc32->reset(); - *crc32 = std::for_each( buff, buff + 8, *crc32 );//crc for starting 8 bytes - preload_crc = (*crc32)(); - encode_int(&buff[i], preload_crc, i); + crc32.reset(); + crc32 = std::for_each(sql_result.begin(), sql_result.end(), crc32); //crc for payload + checksum + message_crc = crc32(); - i += result_len;//advance to the end of payload. + u_int32_t x = htonl(message_crc); + sql_result.append(reinterpret_cast(&x), sizeof(x)); - crc32->reset(); - *crc32 = std::for_each( buff, buff + i, *crc32 );//crc for payload + checksum - message_crc = (*crc32)(); - char out_encode[4]; - encode_int(out_encode, message_crc, i); - out_string.append(out_encode,sizeof(out_encode)); + return sql_result.size(); +} + +void aws_response_handler::init_response() +{ //12 positions for header-crc + sql_result.resize(header_crc_size,'\0'); +} + +void aws_response_handler::init_success_response() +{ + m_buff_header.clear(); + header_size = create_header_records(); + sql_result.append(m_buff_header.c_str(), header_size); + sql_result.append(PAYLOAD_LINE); +} + +void aws_response_handler::send_continuation_response() +{ + sql_result.resize(header_crc_size,'\0'); + m_buff_header.clear(); + header_size = create_header_continuation(); + sql_result.append(m_buff_header.c_str(), header_size); + int buff_len = create_message(header_size); + s->formatter->write_bin_data(sql_result.data(), buff_len); + rgw_flush_formatter_and_reset(s, s->formatter); +} - return i; +void aws_response_handler::init_progress_response() +{ + sql_result.resize(header_crc_size,'\0'); + m_buff_header.clear(); + header_size = create_header_progress(); + sql_result.append(m_buff_header.c_str(), header_size); } -#define PAYLOAD_LINE "\n\n\n\n" -#define END_PAYLOAD_LINE "\n" +void aws_response_handler::init_stats_response() +{ + sql_result.resize(header_crc_size,'\0'); + m_buff_header.clear(); + header_size = create_header_stats(); + sql_result.append(m_buff_header.c_str(), header_size); +} + +void aws_response_handler::init_end_response() +{ + sql_result.resize(header_crc_size,'\0'); + m_buff_header.clear(); + header_size = create_header_end(); + sql_result.append(m_buff_header.c_str(), header_size); + int buff_len = create_message(header_size); + s->formatter->write_bin_data(sql_result.data(), buff_len); + rgw_flush_formatter_and_reset(s, s->formatter); +} + +void aws_response_handler::init_error_response(const char *error_message) +{ //currently not in use. the headers in the case of error, are not extracted by AWS-cli. + m_buff_header.clear(); + header_size = create_error_header_records(error_message); + sql_result.append(m_buff_header.c_str(), header_size); +} + +void aws_response_handler::send_success_response() +{ + sql_result.append(END_PAYLOAD_LINE); + int buff_len = create_message(header_size); + s->formatter->write_bin_data(sql_result.data(), buff_len); + rgw_flush_formatter_and_reset(s, s->formatter); +} + +void aws_response_handler::send_error_response(const char *error_code, + const char *error_message, + const char *resource_id) +{ + + set_req_state_err(s, 0); //TODO what err_no? + dump_errno(s, 400); + end_header(s, m_rgwop, "application/xml", CHUNKED_TRANSFER_ENCODING); + dump_start(s); + + s->formatter->open_object_section("Error"); + + s->formatter->dump_string("Code", error_code); + s->formatter->dump_string("Message", error_message); + s->formatter->dump_string("Resource", "#Resource#"); + s->formatter->dump_string("RequestId", resource_id); + + s->formatter->close_section(); + + rgw_flush_formatter_and_reset(s, s->formatter); +} + +void aws_response_handler::send_progress_response() +{ + std::string progress_payload="" + to_string(get_processed_size()) + + "" + to_string(get_processed_size()) + "" + + "" + to_string(get_total_bytes_returned()) + ""; + + sql_result.append(progress_payload); + int buff_len = create_message(header_size); + + s->formatter->write_bin_data(sql_result.data(), buff_len); + rgw_flush_formatter_and_reset(s, s->formatter); +} + +void aws_response_handler::send_stats_response() +{ + std::string stats_payload="" + to_string(get_processed_size()) + + "" + to_string(get_processed_size()) + "" + + "" + to_string(get_total_bytes_returned()) + ""; + + sql_result.append(stats_payload); + int buff_len = create_message(header_size); + + s->formatter->write_bin_data(sql_result.data(), buff_len); + rgw_flush_formatter_and_reset(s, s->formatter); +} + +RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3() : s3select_syntax(std::make_unique()), + m_s3_csv_object(std::unique_ptr()), + chunk_number(0) +{ + set_get_data(true); +} + +RGWSelectObj_ObjStore_S3::~RGWSelectObj_ObjStore_S3() +{ +} + +int RGWSelectObj_ObjStore_S3::get_params(optional_yield y) +{ + + //retrieve s3-select query from payload + bufferlist data; + int ret; + int max_size = 4096; + std::tie(ret, data) = read_all_input(s, max_size, false); + if (ret != 0) { + ldpp_dout(this, 10) << "s3-select query: failed to retrieve query; ret = " << ret << dendl; + return ret; + } + + m_s3select_query = data.to_str(); + if (m_s3select_query.length() > 0) { + ldpp_dout(this, 10) << "s3-select query: " << m_s3select_query << dendl; + } + else { + ldpp_dout(this, 10) << "s3-select query: failed to retrieve query;" << dendl; + return -1; + } + + int status = handle_aws_cli_parameters(m_sql_query); + + if (status<0) { + return status; + } + + return RGWGetObj_ObjStore_S3::get_params(y); +} int RGWSelectObj_ObjStore_S3::run_s3select(const char* query, const char* input, size_t input_length) { int status = 0; + uint32_t length_before_processing, length_post_processing; csv_object::csv_defintions csv; - - m_result = "012345678901"; //12 positions for header-crc - - int header_size = 0; + const char* s3select_syntax_error = "s3select-Syntax-Error"; + const char* s3select_resource_id = "resourcse-id"; + const char* s3select_processTime_error = "s3select-ProcessingTime-Error"; if (m_s3_csv_object==0) { s3select_syntax->parse_query(query); @@ -6298,47 +6469,109 @@ int RGWSelectObj_ObjStore_S3::run_s3select(const char* query, const char* input, csv.escape_char = *m_escape_char.c_str(); } + if (m_enable_progress.compare("true")==0) { + enable_progress = true; + } else { + enable_progress = false; + } + + if (output_row_delimiter.size()) { + csv.output_row_delimiter = *output_row_delimiter.c_str(); + } + + if (output_column_delimiter.size()) { + csv.output_column_delimiter = *output_column_delimiter.c_str(); + } + + if (output_quot.size()) { + csv.output_quot_char = *output_quot.c_str(); + } + + if (output_escape_char.size()) { + csv.output_escape_char = *output_escape_char.c_str(); + } + + if(output_quote_fields.compare("ALWAYS") == 0) { + csv.quote_fields_always = true; + } + else if(output_quote_fields.compare("ASNEEDED") == 0) { + csv.quote_fields_asneeded = true; + } + if(m_header_info.compare("IGNORE")==0) { csv.ignore_header_info=true; } else if(m_header_info.compare("USE")==0) { csv.use_header_info=true; } - m_s3_csv_object = std::unique_ptr(new s3selectEngine::csv_object(s3select_syntax.get(), csv)); } - header_size = create_header_records(m_buff_header.get()); - m_result.append(m_buff_header.get(), header_size); - m_result.append(PAYLOAD_LINE); + m_aws_response_handler->init_response(); + + if (s3select_syntax->get_error_description().empty() == false) + { //error-flow (syntax-error) + m_aws_response_handler->send_error_response(s3select_syntax_error, + s3select_syntax->get_error_description().c_str(), + s3select_resource_id); - if (s3select_syntax->get_error_description().empty() == false) { - m_result.append(s3select_syntax->get_error_description()); - ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax->get_error_description() << "}"<< dendl; - status = -1; + ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax->get_error_description() << "}" << dendl; + return -1; } - else { - status = m_s3_csv_object->run_s3select_on_stream(m_result, input, input_length, s->obj_size); - if(status<0) { - m_result.append(m_s3_csv_object->get_error_description()); + else + { + if (input == nullptr) { + input = ""; } - } + m_aws_response_handler->init_success_response(); + length_before_processing = (m_aws_response_handler->get_sql_result()).size(); + + //query is correct(syntax), processing is starting. + status = m_s3_csv_object->run_s3select_on_stream(m_aws_response_handler->get_sql_result(), input, input_length, s->obj_size); + length_post_processing = (m_aws_response_handler->get_sql_result()).size(); + m_aws_response_handler->set_total_bytes_returned(length_post_processing-length_before_processing); + if (status < 0) + { //error flow(processing-time) + m_aws_response_handler->send_error_response(s3select_processTime_error, + m_s3_csv_object->get_error_description().c_str(), + s3select_resource_id); - if (m_result.size() > strlen(PAYLOAD_LINE)) { - m_result.append(END_PAYLOAD_LINE); - int buff_len = create_message(m_result, m_result.size() - 12, header_size); - s->formatter->write_bin_data(m_result.data(), buff_len); - if (op_ret < 0) { - return op_ret; + ldpp_dout(this, 10) << "s3-select query: failed to process query; {" << m_s3_csv_object->get_error_description() << "}" << dendl; + return -1; } + + if (chunk_number == 0) + {//success flow + if (op_ret < 0) + { + set_req_state_err(s, op_ret); + } + dump_errno(s); + // Explicitly use chunked transfer encoding so that we can stream the result + // to the user without having to wait for the full length of it. + end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING); + } + chunk_number++; + } + + if (length_post_processing-length_before_processing != 0) { + m_aws_response_handler->send_success_response(); + } else { + m_aws_response_handler->send_continuation_response(); + } + + if (enable_progress == true) { + m_aws_response_handler->init_progress_response(); + m_aws_response_handler->send_progress_response(); } - rgw_flush_formatter_and_reset(s, s->formatter); return status; } int RGWSelectObj_ObjStore_S3::handle_aws_cli_parameters(std::string& sql_query) { + std::string input_tag{"InputSerialization"}; + std::string output_tag{"OutputSerialization"}; if(chunk_number !=0) { return 0; @@ -6354,65 +6587,82 @@ int RGWSelectObj_ObjStore_S3::handle_aws_cli_parameters(std::string& sql_query) } //AWS cli s3select parameters - extract_by_tag("Expression", sql_query); - extract_by_tag("FieldDelimiter", m_column_delimiter); - extract_by_tag("QuoteCharacter", m_quot); - extract_by_tag("RecordDelimiter", m_row_delimiter); + extract_by_tag(m_s3select_query, "Expression", sql_query); + extract_by_tag(m_s3select_query, "Enabled", m_enable_progress); + + size_t _qi = m_s3select_query.find("<" + input_tag + ">", 0); + size_t _qe = m_s3select_query.find("", _qi); + m_s3select_input = m_s3select_query.substr(_qi + input_tag.size() + 2, _qe - (_qi + input_tag.size() + 2)); + + extract_by_tag(m_s3select_input,"FieldDelimiter", m_column_delimiter); + extract_by_tag(m_s3select_input, "QuoteCharacter", m_quot); + extract_by_tag(m_s3select_input, "RecordDelimiter", m_row_delimiter); + extract_by_tag(m_s3select_input, "FileHeaderInfo", m_header_info); + if (m_row_delimiter.size()==0) { m_row_delimiter='\n'; } + else if(m_row_delimiter.compare(" ") == 0) + {//presto change + m_row_delimiter='\n'; + } + + extract_by_tag(m_s3select_input, "QuoteEscapeCharacter", m_escape_char); + extract_by_tag(m_s3select_input, "CompressionType", m_compression_type); + + size_t _qo = m_s3select_query.find("<" + output_tag + ">", 0); + size_t _qs = m_s3select_query.find("", _qi); + m_s3select_output = m_s3select_query.substr(_qo + output_tag.size() + 2, _qs - (_qo + output_tag.size() + 2)); + + extract_by_tag(m_s3select_output, "FieldDelimiter", output_column_delimiter); + extract_by_tag(m_s3select_output, "QuoteCharacter", output_quot); + extract_by_tag(m_s3select_output, "QuoteEscapeCharacter", output_escape_char); + extract_by_tag(m_s3select_output, "QuoteFields", output_quote_fields); + extract_by_tag(m_s3select_output, "RecordDelimiter", output_row_delimiter); - extract_by_tag("QuoteEscapeCharacter", m_escape_char); - extract_by_tag("CompressionType", m_compression_type); if (m_compression_type.length()>0 && m_compression_type.compare("NONE") != 0) { ldpp_dout(this, 10) << "RGW supports currently only NONE option for compression type" << dendl; return -1; } - extract_by_tag("FileHeaderInfo", m_header_info); - return 0; } -int RGWSelectObj_ObjStore_S3::extract_by_tag(std::string tag_name, std::string& result) +int RGWSelectObj_ObjStore_S3::extract_by_tag(std::string input, std::string tag_name, std::string& result) { result = ""; - size_t _qs = m_s3select_query.find("<" + tag_name + ">", 0) + tag_name.size() + 2; + size_t _qs = input.find("<" + tag_name + ">", 0); + size_t qs_input = _qs + tag_name.size() + 2; if (_qs == std::string::npos) { return -1; } - size_t _qe = m_s3select_query.find("", _qs); + size_t _qe = input.find("", qs_input); if (_qe == std::string::npos) { return -1; } - result = m_s3select_query.substr(_qs, _qe - _qs); + result = input.substr(qs_input, _qe - qs_input); return 0; } int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_t len) { - if (len == 0) { - return 0; + int status=0; + if (m_aws_response_handler == nullptr) { + m_aws_response_handler = std::make_unique(s,this); } - - if (chunk_number == 0) { - if (op_ret < 0) { - set_req_state_err(s, op_ret); - } - dump_errno(s); + + if(len == 0 && s->obj_size != 0) { + return 0; } + + if (s->obj_size == 0) { + status = run_s3select(m_sql_query.c_str(), nullptr, 0); + } else { auto bl_len = bl.get_num_buffers(); - // Explicitly use chunked transfer encoding so that we can stream the result - // to the user without having to wait for the full length of it. - if (chunk_number == 0) { - end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING); - } - - int status=0; int i=0; for(auto& it : bl.buffers()) { @@ -6425,15 +6675,23 @@ int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_ << " obj-size " << s->obj_size << dendl; continue; } + + m_aws_response_handler->set_processed_size(it.length()); status = run_s3select(m_sql_query.c_str(), &(it)[0], it.length()); if(status<0) { break; - } + } i++; } + } - chunk_number++; - + if (m_aws_response_handler->get_processed_size() == s->obj_size) { + if (status >=0) { + m_aws_response_handler->init_stats_response(); + m_aws_response_handler->send_stats_response(); + m_aws_response_handler->init_end_response(); + } + } return status; } diff --git a/src/rgw/rgw_rest_s3.h b/src/rgw/rgw_rest_s3.h index fd539a2553b23..92a7cf2288b7d 100644 --- a/src/rgw/rgw_rest_s3.h +++ b/src/rgw/rgw_rest_s3.h @@ -914,42 +914,136 @@ class s3select; class csv_object; } + +class aws_response_handler +{//TODO this class should reside on s3select submodule + +private: + std::string sql_result; + struct req_state *s;//TODO will be replace by callback + uint32_t header_size; + // the parameters are according to CRC-32 algorithm and its aligned with AWS-cli checksum + boost::crc_optimal<32, 0x04C11DB7, 0xFFFFFFFF, 0xFFFFFFFF, true, true> crc32; + RGWOp *m_rgwop; + std::string m_buff_header; + uint64_t total_bytes_returned; + uint64_t processed_size; + + enum header_name_En + { + EVENT_TYPE, + CONTENT_TYPE, + MESSAGE_TYPE, + ERROR_CODE, + ERROR_MESSAGE + }; + + enum header_value_En + { + RECORDS, + OCTET_STREAM, + EVENT, + CONT, + PROGRESS, + END, + XML, + STATS, + ENGINE_ERROR, + ERROR_TYPE + }; + + const char *PAYLOAD_LINE= "\n\n\n\n"; + const char *END_PAYLOAD_LINE= "\n"; + const char *header_name_str[5] = {":event-type", ":content-type", ":message-type",":error-code",":error-message"}; + const char *header_value_str[10] = {"Records", "application/octet-stream", "event", "Cont", "Progress", "End", "text/xml", "Stats", "s3select-engine-error","error"}; + static constexpr size_t header_crc_size = 12; + + void push_header(const char * header_name,const char* header_value); + + int create_message(u_int32_t header_len); + +public: + //12 positions for header-crc + aws_response_handler(struct req_state *ps,RGWOp *rgwop) : sql_result("012345678901"), s(ps),m_rgwop(rgwop),total_bytes_returned{0},processed_size{0} + { } + + std::string &get_sql_result(); + + uint64_t get_processed_size(); + + void set_processed_size(uint64_t value); + + uint64_t get_total_bytes_returned(); + + void set_total_bytes_returned(uint64_t value); + + int create_header_records(); + + int create_header_continuation(); + + int create_header_progress(); + + int create_header_stats(); + + int create_header_end(); + + int create_error_header_records(const char* error_message); + + void init_response(); + + void init_success_response(); + + void send_continuation_response(); + + void init_progress_response(); + + void init_end_response(); + + void init_stats_response(); + + void init_error_response(const char* error_message); + + void send_success_response(); + + void send_progress_response(); + + void send_stats_response(); + + void send_error_response(const char* error_code, + const char* error_message, + const char* resource_id); + +}; //end class aws_response_handler + class RGWSelectObj_ObjStore_S3 : public RGWGetObj_ObjStore_S3 { private: std::unique_ptr s3select_syntax; std::string m_s3select_query; - std::string m_result; + std::string m_s3select_input; + std::string m_s3select_output; std::unique_ptr m_s3_csv_object; std::string m_column_delimiter; std::string m_quot; std::string m_row_delimiter; std::string m_compression_type; std::string m_escape_char; - std::unique_ptr m_buff_header; std::string m_header_info; std::string m_sql_query; + std::string m_enable_progress; + std::string output_column_delimiter; + std::string output_quot; + std::string output_escape_char; + std::string output_quote_fields; + std::string output_row_delimiter; + + std::unique_ptr m_aws_response_handler; + bool enable_progress; public: unsigned int chunk_number; - enum header_name_En - { - EVENT_TYPE, - CONTENT_TYPE, - MESSAGE_TYPE - }; - static const char* header_name_str[3]; - - enum header_value_En - { - RECORDS, - OCTET_STREAM, - EVENT - }; - static const char* header_value_str[3]; - RGWSelectObj_ObjStore_S3(); virtual ~RGWSelectObj_ObjStore_S3(); @@ -958,19 +1052,10 @@ public: virtual int get_params(optional_yield y) override; private: - void encode_short(char* buff, uint16_t s, int& i); - - void encode_int(char* buff, u_int32_t s, int& i); - - int create_header_records(char* buff); - - std::unique_ptr crc32; - - int create_message(std::string&, u_int32_t result_len, u_int32_t header_len); int run_s3select(const char* query, const char* input, size_t input_length); - int extract_by_tag(std::string tag_name, std::string& result); + int extract_by_tag(std::string input, std::string tag_name, std::string& result); void convert_escape_seq(std::string& esc); diff --git a/src/s3select b/src/s3select index 63129ea4d2777..922561ce6bb90 160000 --- a/src/s3select +++ b/src/s3select @@ -1 +1 @@ -Subproject commit 63129ea4d2777204d0ddc2786c11062b6884a88b +Subproject commit 922561ce6bb9017b85d74b9a1a43a52af1ed65ae