using namespace s3selectEngine;
-const char* RGWSelectObj_ObjStore_S3::header_name_str[3] = {":event-type", ":content-type", ":message-type"};
-const char* RGWSelectObj_ObjStore_S3::header_value_str[3] = {"Records", "application/octet-stream", "event"};
-RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3():
- s3select_syntax(std::make_unique<s3selectEngine::s3select>()),
- m_s3_csv_object(std::unique_ptr<s3selectEngine::csv_object>()),
- m_buff_header(std::make_unique<char[]>(1000)),
- chunk_number(0),
- crc32(std::unique_ptr<boost::crc_32_type>())
+class aws_response_handler
+{//TODO this class should reside on s3select submodule
+
+private:
+ std::string sql_result;
+ struct req_state *s;//TODO will be replace by callback
+ uint32_t header_size;
+ std::unique_ptr<boost::crc_32_type> crc32;
+ RGWOp *m_rgwop;
+ std::string m_buff_header;
+
+ enum header_name_En
+ {
+ EVENT_TYPE,
+ CONTENT_TYPE,
+ MESSAGE_TYPE,
+ ERROR_CODE,
+ ERROR_MESSAGE
+
+ };
+
+ enum header_value_En
+ {
+ RECORDS,
+ OCTET_STREAM,
+ EVENT,
+ ENGINE_ERROR,
+ ERROR_TYPE
+ };
+
+ const char *PAYLOAD_LINE= "\n<Payload>\n<Records>\n<Payload>\n";
+ const char *END_PAYLOAD_LINE= "\n</Payload></Records></Payload>";
+ const char *header_name_str[5] = {":event-type", ":content-type", ":message-type","error-code","error-message"};
+ const char *header_value_str[5] = {"Records", "application/octet-stream", "event","s3select-engine-error","error"};
+
+public:
+ //12 positions for header-crc
+ aws_response_handler(struct req_state *ps,RGWOp *rgwop) : sql_result("012345678901"), s(ps),m_rgwop(rgwop)
+ {
+ // the parameters are according to CRC-32 algorithm and its aligned with AWS-cli checksum
+ crc32 = std::unique_ptr<boost::crc_32_type>(new boost::crc_optimal<32, 0x04C11DB7, 0xFFFFFFFF, 0xFFFFFFFF, true, true>);
+ }
+
+ std::string &get_sql_result()
+ {
+ return sql_result;
+ }
+
+ void push_header(const char * header_name,const char* header_value)
+ {
+ char x;
+ short s;
+
+ x = char(strlen(header_name));
+ m_buff_header.append(&x,sizeof(x));
+ m_buff_header.append(header_name);
+
+ x = char(7);
+ m_buff_header.append(&x,sizeof(x));
+
+ s = htons(uint16_t(strlen(header_value)));
+ m_buff_header.append(reinterpret_cast<char*>(&s),sizeof(s));
+ m_buff_header.append(header_value);
+ }
+
+ int create_header_records()
+ {
+ //headers description(AWS)
+ //[header-name-byte-length:1][header-name:variable-length][header-value-type:1][header-value:variable-length]
+
+ //1
+ push_header(header_name_str[EVENT_TYPE],header_value_str[RECORDS]);
+ //2
+ push_header(header_name_str[CONTENT_TYPE],header_value_str[OCTET_STREAM]);
+ //3
+ push_header(header_name_str[MESSAGE_TYPE],header_value_str[EVENT]);
+
+ return m_buff_header.size();
+ }
+
+ int create_error_header_records(const char* error_message)
+ {
+ //headers description(AWS)
+ //[header-name-byte-length:1][header-name:variable-length][header-value-type:1][header-value:variable-length]
+
+ //1
+ push_header(header_name_str[ERROR_CODE],header_value_str[ENGINE_ERROR]);
+ //2
+ push_header(header_name_str[ERROR_MESSAGE],error_message);
+ //3
+ push_header(header_name_str[MESSAGE_TYPE],header_value_str[ERROR_TYPE]);
+
+ return m_buff_header.size();
+ }
+
+ int create_message(u_int32_t header_len)
+ {
+ //message description(AWS):
+ //[total-byte-length:4][header-byte-length:4][crc:4][headers:variable-length][payload:variable-length][crc:4]
+ //s3select result is produced into sql_result, the sql_result is also the response-message, thus the attach headers and CRC
+ //are created later to the produced SQL result, and actually wrapping the payload.
+
+ auto push_encode_int = [&](u_int32_t s,int pos)
+ {
+ u_int32_t x = htonl(s);
+ sql_result.replace(pos,sizeof(x),reinterpret_cast<char*>(&x),sizeof(x));
+ };
+
+
+ u_int32_t total_byte_len = 0;
+ u_int32_t preload_crc = 0;
+ u_int32_t message_crc = 0;
+
+ total_byte_len = sql_result.size() + 4; //the total is greater in 4 bytes than current size
+
+ push_encode_int(total_byte_len,0);
+ push_encode_int(header_len,4);
+
+ crc32->reset();
+ *crc32 = std::for_each(sql_result.data(), sql_result.data() + 8, *crc32); //crc for starting 8 bytes
+ preload_crc = (*crc32)();
+ push_encode_int(preload_crc,8);
+
+ crc32->reset();
+ *crc32 = std::for_each(sql_result.begin(), sql_result.end(), *crc32); //crc for payload + checksum
+ message_crc = (*crc32)();
+
+ u_int32_t x = htonl(message_crc);
+ sql_result.append(reinterpret_cast<char*>(&x), sizeof(x));
+
+ return sql_result.size();
+ }
+
+ void init_response()
+ { //12 positions for header-crc
+ sql_result = "012345678901";
+ }
+
+ void init_success_response()
+ {
+ m_buff_header.clear();
+ header_size = create_header_records();
+ sql_result.append(m_buff_header.c_str(), header_size);
+ sql_result.append(PAYLOAD_LINE);
+ }
+
+ void init_error_response(const char* error_message)
+ {//currently not in use. the headers in the case of error, are not extracted by AWS-cli.
+ m_buff_header.clear();
+ header_size = create_error_header_records(error_message);
+ sql_result.append(m_buff_header, header_size);
+ }
+
+ void send_success_response()
+ {
+ if (sql_result.size() > strlen(PAYLOAD_LINE))
+ {
+ sql_result.append(END_PAYLOAD_LINE);
+ int buff_len = create_message(header_size);
+ s->formatter->write_bin_data(sql_result.data(), buff_len);
+ }
+ rgw_flush_formatter_and_reset(s, s->formatter);
+ }
+
+ void send_error_response(const char* error_code,
+ const char* error_message,
+ const char* resource_id)
+ {
+
+ set_req_state_err(s, 0);//TODO what err_no?
+ dump_errno(s, 400);
+ end_header(s, m_rgwop, "application/xml", CHUNKED_TRANSFER_ENCODING);
+ dump_start(s);
+
+ s->formatter->open_object_section("Error");
+
+ s->formatter->dump_string("Code", error_code);
+ s->formatter->dump_string("Message", error_message);
+ s->formatter->dump_string("Resource", "#Resource#");
+ s->formatter->dump_string("RequestId", resource_id);
+
+ s->formatter->close_section();
+
+ rgw_flush_formatter_and_reset(s, s->formatter);
+ }
+
+}; //end class aws_response_handler
+
+RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3() : s3select_syntax(std::make_unique<s3selectEngine::s3select>()),
+ m_s3_csv_object(std::unique_ptr<s3selectEngine::csv_object>()),
+ chunk_number(0)
{
set_get_data(true);
}
return RGWGetObj_ObjStore_S3::get_params(y);
}
-void RGWSelectObj_ObjStore_S3::encode_short(char* buff, uint16_t s, int& i)
-{
- short x = htons(s);
- memcpy(buff, &x, sizeof(s));
- i+=sizeof(s);
-}
-
-void RGWSelectObj_ObjStore_S3::encode_int(char* buff, u_int32_t s, int& i)
-{
- u_int32_t x = htonl(s);
- memcpy(buff, &x, sizeof(s));
- i+=sizeof(s);
-}
-
-int RGWSelectObj_ObjStore_S3::create_header_records(char* buff)
-{
- int i = 0;
-
- //headers description(AWS)
- //[header-name-byte-length:1][header-name:variable-length][header-value-type:1][header-value:variable-length]
-
- //1
- buff[i++] = char(strlen(header_name_str[EVENT_TYPE]));
- memcpy(&buff[i], header_name_str[EVENT_TYPE], strlen(header_name_str[EVENT_TYPE]));
- i += strlen(header_name_str[EVENT_TYPE]);
- buff[i++] = char(7);
- encode_short(&buff[i], uint16_t(strlen(header_value_str[RECORDS])), i);
- memcpy(&buff[i], header_value_str[RECORDS], strlen(header_value_str[RECORDS]));
- i += strlen(header_value_str[RECORDS]);
-
- //2
- buff[i++] = char(strlen(header_name_str[CONTENT_TYPE]));
- memcpy(&buff[i], header_name_str[CONTENT_TYPE], strlen(header_name_str[CONTENT_TYPE]));
- i += strlen(header_name_str[CONTENT_TYPE]);
- buff[i++] = char(7);
- encode_short(&buff[i], uint16_t(strlen(header_value_str[OCTET_STREAM])), i);
- memcpy(&buff[i], header_value_str[OCTET_STREAM], strlen(header_value_str[OCTET_STREAM]));
- i += strlen(header_value_str[OCTET_STREAM]);
-
- //3
- buff[i++] = char(strlen(header_name_str[MESSAGE_TYPE]));
- memcpy(&buff[i], header_name_str[MESSAGE_TYPE], strlen(header_name_str[MESSAGE_TYPE]));
- i += strlen(header_name_str[MESSAGE_TYPE]);
- buff[i++] = char(7);
- encode_short(&buff[i], uint16_t(strlen(header_value_str[EVENT])), i);
- memcpy(&buff[i], header_value_str[EVENT], strlen(header_value_str[EVENT]));
- i += strlen(header_value_str[EVENT]);
-
- return i;
-}
-
-int RGWSelectObj_ObjStore_S3::create_message(std::string &out_string, u_int32_t result_len, u_int32_t header_len)
-{
- //message description(AWS):
- //[total-byte-length:4][header-byte-length:4][crc:4][headers:variable-length][payload:variable-length][crc:4]
- //s3select result is produced into m_result, the m_result is also the response-message, thus the attach headers and CRC
- //are created later to the produced SQL result, and actually wrapping the payload.
-
- u_int32_t total_byte_len = 0;
- u_int32_t preload_crc = 0;
- u_int32_t message_crc = 0;
- int i = 0;
- char * buff = out_string.data();
-
- if(crc32 ==0) {
- // the parameters are according to CRC-32 algorithm and its aligned with AWS-cli checksum
- crc32 = std::unique_ptr<boost::crc_32_type>(new boost::crc_optimal<32, 0x04C11DB7, 0xFFFFFFFF, 0xFFFFFFFF, true, true>);
- }
-
- total_byte_len = result_len + 16;//the total is greater in 4 bytes than current size
-
- encode_int(&buff[i], total_byte_len, i);//store sizes at the beginning of the buffer
- encode_int(&buff[i], header_len, i);
-
- crc32->reset();
- *crc32 = std::for_each( buff, buff + 8, *crc32 );//crc for starting 8 bytes
- preload_crc = (*crc32)();
- encode_int(&buff[i], preload_crc, i);
-
- i += result_len;//advance to the end of payload.
-
- crc32->reset();
- *crc32 = std::for_each( buff, buff + i, *crc32 );//crc for payload + checksum
- message_crc = (*crc32)();
- char out_encode[4];
- encode_int(out_encode, message_crc, i);
- out_string.append(out_encode,sizeof(out_encode));
-
- return i;
-}
-
-#define PAYLOAD_LINE "\n<Payload>\n<Records>\n<Payload>\n"
-#define END_PAYLOAD_LINE "\n</Payload></Records></Payload>"
-
int RGWSelectObj_ObjStore_S3::run_s3select(const char* query, const char* input, size_t input_length)
{
int status = 0;
csv_object::csv_defintions csv;
-
- m_result = "012345678901"; //12 positions for header-crc
-
- int header_size = 0;
+ const char* s3select_syntax_error = "s3select-Syntax-Error";
+ const char* s3select_resource_id = "resourcse-id";
+ const char* s3select_processTime_error = "s3select-ProcessingTime-Error";
if (m_s3_csv_object==0) {
s3select_syntax->parse_query(query);
csv.use_header_info=true;
}
+ m_aws_response_handler = std::make_unique<aws_response_handler>(s,this);
m_s3_csv_object = std::unique_ptr<s3selectEngine::csv_object>(new s3selectEngine::csv_object(s3select_syntax.get(), csv));
}
- header_size = create_header_records(m_buff_header.get());
- m_result.append(m_buff_header.get(), header_size);
- m_result.append(PAYLOAD_LINE);
+ m_aws_response_handler.get()->init_response();
- if (s3select_syntax->get_error_description().empty() == false) {
- m_result.append(s3select_syntax->get_error_description());
- ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax->get_error_description() << "}"<< dendl;
- status = -1;
+ if (s3select_syntax->get_error_description().empty() == false)
+ { //error-flow (syntax-error)
+ m_aws_response_handler.get()->send_error_response(s3select_syntax_error,
+ s3select_syntax->get_error_description().c_str(),
+ s3select_resource_id);
+
+ ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax->get_error_description() << "}" << dendl;
+ return -1;
}
- else {
- status = m_s3_csv_object->run_s3select_on_stream(m_result, input, input_length, s->obj_size);
- if(status<0) {
- m_result.append(m_s3_csv_object->get_error_description());
+ else
+ {
+
+ m_aws_response_handler.get()->init_success_response();
+
+ //query is correct(syntax), processing is starting.
+ status = m_s3_csv_object->run_s3select_on_stream(m_aws_response_handler.get()->get_sql_result(), input, input_length, s->obj_size);
+ if (status < 0)
+ { //error flow(processing-time)
+ m_aws_response_handler.get()->send_error_response(s3select_processTime_error,
+ m_s3_csv_object->get_error_description().c_str(),
+ s3select_resource_id);
+
+ ldpp_dout(this, 10) << "s3-select query: failed to process query; {" << m_s3_csv_object->get_error_description() << "}" << dendl;
+ return -1;
}
- }
- if (m_result.size() > strlen(PAYLOAD_LINE)) {
- m_result.append(END_PAYLOAD_LINE);
- int buff_len = create_message(m_result, m_result.size() - 12, header_size);
- s->formatter->write_bin_data(m_result.data(), buff_len);
- if (op_ret < 0) {
- return op_ret;
+ if (chunk_number == 0)
+ {//success flow
+ if (op_ret < 0)
+ {
+ set_req_state_err(s, op_ret);
+ }
+ dump_errno(s);
+ // Explicitly use chunked transfer encoding so that we can stream the result
+ // to the user without having to wait for the full length of it.
+ end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING);
}
+
+ chunk_number++;
}
- rgw_flush_formatter_and_reset(s, s->formatter);
+
+ m_aws_response_handler.get()->send_success_response();
return status;
}
return 0;
}
- if (chunk_number == 0) {
- if (op_ret < 0) {
- set_req_state_err(s, op_ret);
- }
- dump_errno(s);
- }
-
auto bl_len = bl.get_num_buffers();
- // Explicitly use chunked transfer encoding so that we can stream the result
- // to the user without having to wait for the full length of it.
- if (chunk_number == 0) {
- end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING);
- }
-
int status=0;
int i=0;
i++;
}
- chunk_number++;
return status;
}