From: gal salomon Date: Fri, 7 May 2021 21:29:13 +0000 (+0300) Subject: s3select: refacor response handling (*) dedicate object is handling s3select response X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=ebf5fe623429ca8811161293efae4da3563174b6;p=ceph.git s3select: refacor response handling (*) dedicate object is handling s3select response Signed-off-by: gal salomon --- diff --git a/src/rgw/rgw_rest_s3.cc b/src/rgw/rgw_rest_s3.cc index e3b40bc75a64e..9ffcdfd30d02d 100644 --- a/src/rgw/rgw_rest_s3.cc +++ b/src/rgw/rgw_rest_s3.cc @@ -6127,15 +6127,198 @@ bool rgw::auth::s3::S3AnonymousEngine::is_applicable( using namespace s3selectEngine; -const char* RGWSelectObj_ObjStore_S3::header_name_str[3] = {":event-type", ":content-type", ":message-type"}; -const char* RGWSelectObj_ObjStore_S3::header_value_str[3] = {"Records", "application/octet-stream", "event"}; -RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3(): - s3select_syntax(std::make_unique()), - m_s3_csv_object(std::unique_ptr()), - m_buff_header(std::make_unique(1000)), - chunk_number(0), - crc32(std::unique_ptr()) +class aws_response_handler +{//TODO this class should reside on s3select submodule + +private: + std::string sql_result; + struct req_state *s;//TODO will be replace by callback + uint32_t header_size; + std::unique_ptr crc32; + RGWOp *m_rgwop; + std::string m_buff_header; + + enum header_name_En + { + EVENT_TYPE, + CONTENT_TYPE, + MESSAGE_TYPE, + ERROR_CODE, + ERROR_MESSAGE + + }; + + enum header_value_En + { + RECORDS, + OCTET_STREAM, + EVENT, + ENGINE_ERROR, + ERROR_TYPE + }; + + const char *PAYLOAD_LINE= "\n\n\n\n"; + const char *END_PAYLOAD_LINE= "\n"; + const char *header_name_str[5] = {":event-type", ":content-type", ":message-type","error-code","error-message"}; + const char *header_value_str[5] = {"Records", "application/octet-stream", "event","s3select-engine-error","error"}; + +public: + //12 positions for header-crc + aws_response_handler(struct req_state *ps,RGWOp *rgwop) : sql_result("012345678901"), s(ps),m_rgwop(rgwop) + { + // the parameters are according to CRC-32 algorithm and its aligned with AWS-cli checksum + crc32 = std::unique_ptr(new boost::crc_optimal<32, 0x04C11DB7, 0xFFFFFFFF, 0xFFFFFFFF, true, true>); + } + + std::string &get_sql_result() + { + return sql_result; + } + + void push_header(const char * header_name,const char* header_value) + { + char x; + short s; + + x = char(strlen(header_name)); + m_buff_header.append(&x,sizeof(x)); + m_buff_header.append(header_name); + + x = char(7); + m_buff_header.append(&x,sizeof(x)); + + s = htons(uint16_t(strlen(header_value))); + m_buff_header.append(reinterpret_cast(&s),sizeof(s)); + m_buff_header.append(header_value); + } + + int create_header_records() + { + //headers description(AWS) + //[header-name-byte-length:1][header-name:variable-length][header-value-type:1][header-value:variable-length] + + //1 + push_header(header_name_str[EVENT_TYPE],header_value_str[RECORDS]); + //2 + push_header(header_name_str[CONTENT_TYPE],header_value_str[OCTET_STREAM]); + //3 + push_header(header_name_str[MESSAGE_TYPE],header_value_str[EVENT]); + + return m_buff_header.size(); + } + + int create_error_header_records(const char* error_message) + { + //headers description(AWS) + //[header-name-byte-length:1][header-name:variable-length][header-value-type:1][header-value:variable-length] + + //1 + push_header(header_name_str[ERROR_CODE],header_value_str[ENGINE_ERROR]); + //2 + push_header(header_name_str[ERROR_MESSAGE],error_message); + //3 + push_header(header_name_str[MESSAGE_TYPE],header_value_str[ERROR_TYPE]); + + return m_buff_header.size(); + } + + int create_message(u_int32_t header_len) + { + //message description(AWS): + //[total-byte-length:4][header-byte-length:4][crc:4][headers:variable-length][payload:variable-length][crc:4] + //s3select result is produced into sql_result, the sql_result is also the response-message, thus the attach headers and CRC + //are created later to the produced SQL result, and actually wrapping the payload. + + auto push_encode_int = [&](u_int32_t s,int pos) + { + u_int32_t x = htonl(s); + sql_result.replace(pos,sizeof(x),reinterpret_cast(&x),sizeof(x)); + }; + + + u_int32_t total_byte_len = 0; + u_int32_t preload_crc = 0; + u_int32_t message_crc = 0; + + total_byte_len = sql_result.size() + 4; //the total is greater in 4 bytes than current size + + push_encode_int(total_byte_len,0); + push_encode_int(header_len,4); + + crc32->reset(); + *crc32 = std::for_each(sql_result.data(), sql_result.data() + 8, *crc32); //crc for starting 8 bytes + preload_crc = (*crc32)(); + push_encode_int(preload_crc,8); + + crc32->reset(); + *crc32 = std::for_each(sql_result.begin(), sql_result.end(), *crc32); //crc for payload + checksum + message_crc = (*crc32)(); + + u_int32_t x = htonl(message_crc); + sql_result.append(reinterpret_cast(&x), sizeof(x)); + + return sql_result.size(); + } + + void init_response() + { //12 positions for header-crc + sql_result = "012345678901"; + } + + void init_success_response() + { + m_buff_header.clear(); + header_size = create_header_records(); + sql_result.append(m_buff_header.c_str(), header_size); + sql_result.append(PAYLOAD_LINE); + } + + void init_error_response(const char* error_message) + {//currently not in use. the headers in the case of error, are not extracted by AWS-cli. + m_buff_header.clear(); + header_size = create_error_header_records(error_message); + sql_result.append(m_buff_header, header_size); + } + + void send_success_response() + { + if (sql_result.size() > strlen(PAYLOAD_LINE)) + { + sql_result.append(END_PAYLOAD_LINE); + int buff_len = create_message(header_size); + s->formatter->write_bin_data(sql_result.data(), buff_len); + } + rgw_flush_formatter_and_reset(s, s->formatter); + } + + void send_error_response(const char* error_code, + const char* error_message, + const char* resource_id) + { + + set_req_state_err(s, 0);//TODO what err_no? + dump_errno(s, 400); + end_header(s, m_rgwop, "application/xml", CHUNKED_TRANSFER_ENCODING); + dump_start(s); + + s->formatter->open_object_section("Error"); + + s->formatter->dump_string("Code", error_code); + s->formatter->dump_string("Message", error_message); + s->formatter->dump_string("Resource", "#Resource#"); + s->formatter->dump_string("RequestId", resource_id); + + s->formatter->close_section(); + + rgw_flush_formatter_and_reset(s, s->formatter); + } + +}; //end class aws_response_handler + +RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3() : s3select_syntax(std::make_unique()), + m_s3_csv_object(std::unique_ptr()), + chunk_number(0) { set_get_data(true); } @@ -6175,108 +6358,13 @@ int RGWSelectObj_ObjStore_S3::get_params(optional_yield y) return RGWGetObj_ObjStore_S3::get_params(y); } -void RGWSelectObj_ObjStore_S3::encode_short(char* buff, uint16_t s, int& i) -{ - short x = htons(s); - memcpy(buff, &x, sizeof(s)); - i+=sizeof(s); -} - -void RGWSelectObj_ObjStore_S3::encode_int(char* buff, u_int32_t s, int& i) -{ - u_int32_t x = htonl(s); - memcpy(buff, &x, sizeof(s)); - i+=sizeof(s); -} - -int RGWSelectObj_ObjStore_S3::create_header_records(char* buff) -{ - int i = 0; - - //headers description(AWS) - //[header-name-byte-length:1][header-name:variable-length][header-value-type:1][header-value:variable-length] - - //1 - buff[i++] = char(strlen(header_name_str[EVENT_TYPE])); - memcpy(&buff[i], header_name_str[EVENT_TYPE], strlen(header_name_str[EVENT_TYPE])); - i += strlen(header_name_str[EVENT_TYPE]); - buff[i++] = char(7); - encode_short(&buff[i], uint16_t(strlen(header_value_str[RECORDS])), i); - memcpy(&buff[i], header_value_str[RECORDS], strlen(header_value_str[RECORDS])); - i += strlen(header_value_str[RECORDS]); - - //2 - buff[i++] = char(strlen(header_name_str[CONTENT_TYPE])); - memcpy(&buff[i], header_name_str[CONTENT_TYPE], strlen(header_name_str[CONTENT_TYPE])); - i += strlen(header_name_str[CONTENT_TYPE]); - buff[i++] = char(7); - encode_short(&buff[i], uint16_t(strlen(header_value_str[OCTET_STREAM])), i); - memcpy(&buff[i], header_value_str[OCTET_STREAM], strlen(header_value_str[OCTET_STREAM])); - i += strlen(header_value_str[OCTET_STREAM]); - - //3 - buff[i++] = char(strlen(header_name_str[MESSAGE_TYPE])); - memcpy(&buff[i], header_name_str[MESSAGE_TYPE], strlen(header_name_str[MESSAGE_TYPE])); - i += strlen(header_name_str[MESSAGE_TYPE]); - buff[i++] = char(7); - encode_short(&buff[i], uint16_t(strlen(header_value_str[EVENT])), i); - memcpy(&buff[i], header_value_str[EVENT], strlen(header_value_str[EVENT])); - i += strlen(header_value_str[EVENT]); - - return i; -} - -int RGWSelectObj_ObjStore_S3::create_message(std::string &out_string, u_int32_t result_len, u_int32_t header_len) -{ - //message description(AWS): - //[total-byte-length:4][header-byte-length:4][crc:4][headers:variable-length][payload:variable-length][crc:4] - //s3select result is produced into m_result, the m_result is also the response-message, thus the attach headers and CRC - //are created later to the produced SQL result, and actually wrapping the payload. - - u_int32_t total_byte_len = 0; - u_int32_t preload_crc = 0; - u_int32_t message_crc = 0; - int i = 0; - char * buff = out_string.data(); - - if(crc32 ==0) { - // the parameters are according to CRC-32 algorithm and its aligned with AWS-cli checksum - crc32 = std::unique_ptr(new boost::crc_optimal<32, 0x04C11DB7, 0xFFFFFFFF, 0xFFFFFFFF, true, true>); - } - - total_byte_len = result_len + 16;//the total is greater in 4 bytes than current size - - encode_int(&buff[i], total_byte_len, i);//store sizes at the beginning of the buffer - encode_int(&buff[i], header_len, i); - - crc32->reset(); - *crc32 = std::for_each( buff, buff + 8, *crc32 );//crc for starting 8 bytes - preload_crc = (*crc32)(); - encode_int(&buff[i], preload_crc, i); - - i += result_len;//advance to the end of payload. - - crc32->reset(); - *crc32 = std::for_each( buff, buff + i, *crc32 );//crc for payload + checksum - message_crc = (*crc32)(); - char out_encode[4]; - encode_int(out_encode, message_crc, i); - out_string.append(out_encode,sizeof(out_encode)); - - return i; -} - -#define PAYLOAD_LINE "\n\n\n\n" -#define END_PAYLOAD_LINE "\n" - int RGWSelectObj_ObjStore_S3::run_s3select(const char* query, const char* input, size_t input_length) { int status = 0; csv_object::csv_defintions csv; - - m_result = "012345678901"; //12 positions for header-crc - - int header_size = 0; + const char* s3select_syntax_error = "s3select-Syntax-Error"; + const char* s3select_resource_id = "resourcse-id"; + const char* s3select_processTime_error = "s3select-ProcessingTime-Error"; if (m_s3_csv_object==0) { s3select_syntax->parse_query(query); @@ -6304,34 +6392,54 @@ int RGWSelectObj_ObjStore_S3::run_s3select(const char* query, const char* input, csv.use_header_info=true; } + m_aws_response_handler = std::make_unique(s,this); m_s3_csv_object = std::unique_ptr(new s3selectEngine::csv_object(s3select_syntax.get(), csv)); } - header_size = create_header_records(m_buff_header.get()); - m_result.append(m_buff_header.get(), header_size); - m_result.append(PAYLOAD_LINE); + m_aws_response_handler.get()->init_response(); - if (s3select_syntax->get_error_description().empty() == false) { - m_result.append(s3select_syntax->get_error_description()); - ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax->get_error_description() << "}"<< dendl; - status = -1; + if (s3select_syntax->get_error_description().empty() == false) + { //error-flow (syntax-error) + m_aws_response_handler.get()->send_error_response(s3select_syntax_error, + s3select_syntax->get_error_description().c_str(), + s3select_resource_id); + + ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax->get_error_description() << "}" << dendl; + return -1; } - else { - status = m_s3_csv_object->run_s3select_on_stream(m_result, input, input_length, s->obj_size); - if(status<0) { - m_result.append(m_s3_csv_object->get_error_description()); + else + { + + m_aws_response_handler.get()->init_success_response(); + + //query is correct(syntax), processing is starting. + status = m_s3_csv_object->run_s3select_on_stream(m_aws_response_handler.get()->get_sql_result(), input, input_length, s->obj_size); + if (status < 0) + { //error flow(processing-time) + m_aws_response_handler.get()->send_error_response(s3select_processTime_error, + m_s3_csv_object->get_error_description().c_str(), + s3select_resource_id); + + ldpp_dout(this, 10) << "s3-select query: failed to process query; {" << m_s3_csv_object->get_error_description() << "}" << dendl; + return -1; } - } - if (m_result.size() > strlen(PAYLOAD_LINE)) { - m_result.append(END_PAYLOAD_LINE); - int buff_len = create_message(m_result, m_result.size() - 12, header_size); - s->formatter->write_bin_data(m_result.data(), buff_len); - if (op_ret < 0) { - return op_ret; + if (chunk_number == 0) + {//success flow + if (op_ret < 0) + { + set_req_state_err(s, op_ret); + } + dump_errno(s); + // Explicitly use chunked transfer encoding so that we can stream the result + // to the user without having to wait for the full length of it. + end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING); } + + chunk_number++; } - rgw_flush_formatter_and_reset(s, s->formatter); + + m_aws_response_handler.get()->send_success_response(); return status; } @@ -6396,21 +6504,8 @@ int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_ return 0; } - if (chunk_number == 0) { - if (op_ret < 0) { - set_req_state_err(s, op_ret); - } - dump_errno(s); - } - auto bl_len = bl.get_num_buffers(); - // Explicitly use chunked transfer encoding so that we can stream the result - // to the user without having to wait for the full length of it. - if (chunk_number == 0) { - end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING); - } - int status=0; int i=0; @@ -6432,7 +6527,6 @@ int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_ i++; } - chunk_number++; return status; } diff --git a/src/rgw/rgw_rest_s3.h b/src/rgw/rgw_rest_s3.h index fd539a2553b23..5db02d128326d 100644 --- a/src/rgw/rgw_rest_s3.h +++ b/src/rgw/rgw_rest_s3.h @@ -914,42 +914,27 @@ class s3select; class csv_object; } +class aws_response_handler; + class RGWSelectObj_ObjStore_S3 : public RGWGetObj_ObjStore_S3 { private: std::unique_ptr s3select_syntax; std::string m_s3select_query; - std::string m_result; std::unique_ptr m_s3_csv_object; std::string m_column_delimiter; std::string m_quot; std::string m_row_delimiter; std::string m_compression_type; std::string m_escape_char; - std::unique_ptr m_buff_header; std::string m_header_info; std::string m_sql_query; + std::unique_ptr m_aws_response_handler; public: unsigned int chunk_number; - enum header_name_En - { - EVENT_TYPE, - CONTENT_TYPE, - MESSAGE_TYPE - }; - static const char* header_name_str[3]; - - enum header_value_En - { - RECORDS, - OCTET_STREAM, - EVENT - }; - static const char* header_value_str[3]; - RGWSelectObj_ObjStore_S3(); virtual ~RGWSelectObj_ObjStore_S3(); @@ -958,15 +943,6 @@ public: virtual int get_params(optional_yield y) override; private: - void encode_short(char* buff, uint16_t s, int& i); - - void encode_int(char* buff, u_int32_t s, int& i); - - int create_header_records(char* buff); - - std::unique_ptr crc32; - - int create_message(std::string&, u_int32_t result_len, u_int32_t header_len); int run_s3select(const char* query, const char* input, size_t input_length);