From e3254b630601f454d349c79b2486403fb99470e5 Mon Sep 17 00:00:00 2001 From: gal salomon Date: Mon, 12 Apr 2021 08:54:37 +0300 Subject: [PATCH] =?utf8?q?parquet=20implementation:=20(1)=20adding=20arrow?= =?utf8?q?/parquet=20to=20make(install=20is=20missing)=20(2)=20s3select-op?= =?utf8?q?eration=C2=A0contains=202=20flows=20CSV=20and=20Parquet=20(3)=20?= =?utf8?q?upon=20parquet-flow=20s3select=20processing=20engine=20is=20call?= =?utf8?q?ing=C2=A0(via=20callback)=20to=20get-size=20and=20range-request,?= =?utf8?q?=20the=20range-requests=20are=20a-sync,=20thus=20the=20caller=20?= =?utf8?q?is=20waiting=20until=20notification.=20(4)=20flow=20:=20execute?= =?utf8?q?=20-->=20s3select=20--(arrow=20layer)-->=20range-request=20-->?= =?utf8?q?=20GetObj::execute=20-->=20send=5Fresponse=5Fdata=20-->=20notify?= =?utf8?q?-range-request=20-->=20(back-to)=20-->=20s3select=20(5)=20on=20p?= =?utf8?q?arquet=20flow=20the=20s3select=20is=20handling=20the=20response?= =?utf8?q?=20(using=20call-backs)=20because=C2=A0of=20aws-response-limitat?= =?utf8?q?ion=20(16mb)?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit add unique pointer (rgw_api); verify magic number for parquet objects; s3select module update fix buffer-over-flow (copy range request) change the range-request flow. now,it needs to use the callback parametrs (ofs & len) and not to use the element length refactoring. seperate the CSV flow from the parquet flow, a phase before adding conditional build(depend on arrow package installation) adding arrow/parquet installation to debian/control align s3select repo with RGW (missing API"s, such as get_error_description) undefined reference to arrow symbol fix comment: using optional_yield by value fix comments; remove future/promise s3select: a leak fix s3select: fixing result production s3select,s3tests : parquet alignments typo: git-remote --> git_remote s3select: remove redundant comma(end of projections); bug fix in parquet flow upon aggregation queries adding arrow/parquet editorial. remove blank lines s3select: merged with master(output serialization,presto alignments) merging(not rebase) master functionlities into parquet branch (*) a dedicated source-files for s3select operation. (*) s3select-engine: fix leaks on parquet flows, enabling allocate csv_object and parquet_object on stack (*) the csv_object and parquet object allocated on stack (no heap allocation) move data-members from heap to stack allocation, refactoring, separate flows for CSV and parquet. s3select: bug fix conditional build: upon arrow package is installed the parquet flow become visable, thus enables to process parquet object. in case the package is not installed only CSV is usable remove redundant try/catch, s3select: fix compile warning arrow-devel version should be higher than 4.0.0, where arrow::io::AsyncContext become depecrated missing sudo; wrong url;move the rm -f arrow.list replace codename with $(lsb_release -sc) arrow version should be >= 4.0.0; iocontext not exists in namespace on lower versions RGW points to s3select/master s3select submodule sudo --> $SUDO Signed-off-by: gal salomon --- ceph.spec.in | 5 + debian/control | 2 + install-deps.sh | 7 + src/rgw/CMakeLists.txt | 11 + src/rgw/rgw_rest_s3.cc | 577 +--------------------------- src/rgw/rgw_rest_s3.h | 155 -------- src/rgw/rgw_s3select.cc | 669 +++++++++++++++++++++++++++++++++ src/rgw/rgw_s3select.h | 8 + src/rgw/rgw_s3select_private.h | 230 ++++++++++++ src/s3select | 2 +- 10 files changed, 936 insertions(+), 730 deletions(-) create mode 100644 src/rgw/rgw_s3select.cc create mode 100644 src/rgw/rgw_s3select.h create mode 100644 src/rgw/rgw_s3select_private.h diff --git a/ceph.spec.in b/ceph.spec.in index 1ed34e3f1a8d2..82f7a6f72cde0 100644 --- a/ceph.spec.in +++ b/ceph.spec.in @@ -24,6 +24,7 @@ %bcond_with zbd %bcond_with cmake_verbose_logging %bcond_without ceph_test_package +%bcond_without arrow_parquet %ifarch s390 %bcond_with tcmalloc %else @@ -232,6 +233,10 @@ BuildRequires: xfsprogs-devel BuildRequires: xmlstarlet BuildRequires: nasm BuildRequires: lua-devel +%if 0%{with arrow_parquet} +BuildRequires: arrow-devel >= 4.0.0 +BuildRequires: parquet-devel +%endif %if 0%{with seastar} || 0%{with jaeger} BuildRequires: yaml-cpp-devel >= 0.6 %endif diff --git a/debian/control b/debian/control index 0b89cc904a511..c252ca45fc95b 100644 --- a/debian/control +++ b/debian/control @@ -107,6 +107,8 @@ Build-Depends: automake, xmlstarlet , nasm [amd64], zlib1g-dev, + libarrow-dev (>= 4.0.0), + libparquet-dev (>= 4.0.0), Standards-Version: 4.4.0 Package: ceph diff --git a/install-deps.sh b/install-deps.sh index b07e53dd54e0d..0fa7675ea3576 100755 --- a/install-deps.sh +++ b/install-deps.sh @@ -355,12 +355,19 @@ else if $with_jaeger; then build_profiles+=",pkg.ceph.jaeger" fi + + wget -qO - https://dist.apache.org/repos/dist/dev/arrow/KEYS | $SUDO apt-key add - + echo "deb [arch=amd64] https://apache.jfrog.io/artifactory/arrow/ubuntu $(lsb_release -sc) main" | $SUDO tee /etc/apt/sources.list.d/arrow.list + $SUDO apt update + $SUDO env DEBIAN_FRONTEND=noninteractive mk-build-deps \ --build-profiles "${build_profiles#,}" \ --install --remove \ --tool="apt-get -y --no-install-recommends $backports" $control || exit 1 $SUDO env DEBIAN_FRONTEND=noninteractive apt-get -y remove ceph-build-deps if [ "$control" != "debian/control" ] ; then rm $control; fi + $SUDO rm -f /etc/apt/sources.list.d/arrow.list + ;; centos|fedora|rhel|ol|virtuozzo) builddepcmd="dnf -y builddep --allowerasing" diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index d53dad434dff6..4104c5a7f2411 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -2,6 +2,14 @@ find_program(GPERF gperf) if(NOT GPERF) message(FATAL_ERROR "Can't find gperf") endif() + +find_package(Arrow QUIET) +if(Arrow_FOUND) + set(ARROW_LIBRARIES "-larrow -lparquet") + add_definitions(-D_ARROW_EXIST) + message("-- arrow is installed, radosgw/s3select-op is able to process parquet objects") +endif() + function(gperf_generate input output) add_custom_command( OUTPUT ${output} @@ -122,6 +130,7 @@ set(librgw_common_srcs rgw_rest_realm.cc rgw_rest_role.cc rgw_rest_s3.cc + rgw_s3select.cc rgw_role.cc rgw_sal.cc rgw_sal_rados.cc @@ -204,6 +213,7 @@ target_link_libraries(rgw_common ${CURL_LIBRARIES} ${EXPAT_LIBRARIES} ${LUA_LIBRARIES} + ${ARROW_LIBRARIES} PUBLIC spawn) target_include_directories(rgw_common @@ -302,6 +312,7 @@ target_link_libraries(rgw_a common_utf8 global ${CRYPTO_LIBS} ${LUA_LIBRARIES} + ${ARROW_LIBRARIES} OATH::OATH PUBLIC rgw_common diff --git a/src/rgw/rgw_rest_s3.cc b/src/rgw/rgw_rest_s3.cc index 1f043f19a431f..1cb855d09e306 100644 --- a/src/rgw/rgw_rest_s3.cc +++ b/src/rgw/rgw_rest_s3.cc @@ -22,7 +22,6 @@ #pragma clang diagnostic push #pragma clang diagnostic ignored "-Wimplicit-const-int-float-conversion" #endif -#include #ifdef HAVE_WARN_IMPLICIT_CONST_INT_FLOAT_CONVERSION #pragma clang diagnostic pop #endif @@ -71,6 +70,8 @@ #include "rgw_sts.h" #include "rgw_sal_rados.h" +#include "rgw_s3select.h" + #define dout_context g_ceph_context #define dout_subsys ceph_subsys_rgw @@ -4585,7 +4586,7 @@ RGWOp *RGWHandler_REST_Obj_S3::op_post() return new RGWInitMultipart_ObjStore_S3; if (is_select_op()) - return new RGWSelectObj_ObjStore_S3; + return rgw::s3select::create_s3select_op(); return new RGWPostObj_ObjStore_S3; } @@ -6126,575 +6127,3 @@ bool rgw::auth::s3::S3AnonymousEngine::is_applicable( return route == AwsRoute::QUERY_STRING && version == AwsVersion::UNKNOWN; } - -using namespace s3selectEngine; - -std::string &aws_response_handler::get_sql_result() -{ - return sql_result; -} - -uint64_t aws_response_handler::get_processed_size() -{ - return processed_size; -} - -void aws_response_handler::update_processed_size(uint64_t value) -{ - processed_size += value; -} - -uint64_t aws_response_handler::get_total_bytes_returned() -{ - return total_bytes_returned; -} - -void aws_response_handler::update_total_bytes_returned(uint64_t value) -{ - total_bytes_returned += value; -} - -void aws_response_handler::push_header(const char *header_name, const char *header_value) -{ - char x; - short s; - - x = char(strlen(header_name)); - m_buff_header.append(&x, sizeof(x)); - m_buff_header.append(header_name); - - x = char(7); - m_buff_header.append(&x, sizeof(x)); - - s = htons(uint16_t(strlen(header_value))); - m_buff_header.append(reinterpret_cast(&s), sizeof(s)); - m_buff_header.append(header_value); -} - -#define IDX( x ) static_cast( x ) - -int aws_response_handler::create_header_records() -{ - //headers description(AWS) - //[header-name-byte-length:1][header-name:variable-length][header-value-type:1][header-value:variable-length] - - //1 - push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::RECORDS)]); - //2 - push_header(header_name_str[IDX(header_name_En::CONTENT_TYPE)], header_value_str[IDX(header_value_En::OCTET_STREAM)]); - //3 - push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]); - - return m_buff_header.size(); -} - -int aws_response_handler::create_header_continuation() -{ - //headers description(AWS) - //1 - push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::CONT)]); - //2 - push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]); - - return m_buff_header.size(); -} - -int aws_response_handler::create_header_progress() -{ - //headers description(AWS) - //1 - push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::PROGRESS)]); - //2 - push_header(header_name_str[IDX(header_name_En::CONTENT_TYPE)], header_value_str[IDX(header_value_En::XML)]); - //3 - push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]); - - return m_buff_header.size(); -} - -int aws_response_handler::create_header_stats() -{ - //headers description(AWS) - //1 - push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::STATS)]); - //2 - push_header(header_name_str[IDX(header_name_En::CONTENT_TYPE)], header_value_str[IDX(header_value_En::XML)]); - //3 - push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]); - - return m_buff_header.size(); -} - -int aws_response_handler::create_header_end() -{ - //headers description(AWS) - //1 - push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::END)]); - //2 - push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]); - - return m_buff_header.size(); -} - -int aws_response_handler::create_error_header_records(const char *error_message) -{ - //headers description(AWS) - //[header-name-byte-length:1][header-name:variable-length][header-value-type:1][header-value:variable-length] - - //1 - push_header(header_name_str[IDX(header_name_En::ERROR_CODE)], header_value_str[IDX(header_value_En::ENGINE_ERROR)]); - //2 - push_header(header_name_str[IDX(header_name_En::ERROR_MESSAGE)], error_message); - //3 - push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::ERROR_TYPE)]); - - return m_buff_header.size(); -} - -int aws_response_handler::create_message(u_int32_t header_len) -{ - //message description(AWS): - //[total-byte-length:4][header-byte-length:4][crc:4][headers:variable-length][payload:variable-length][crc:4] - //s3select result is produced into sql_result, the sql_result is also the response-message, thus the attach headers and CRC - //are created later to the produced SQL result, and actually wrapping the payload. - - auto push_encode_int = [&](u_int32_t s, int pos) { - u_int32_t x = htonl(s); - sql_result.replace(pos, sizeof(x), reinterpret_cast(&x), sizeof(x)); - }; - - u_int32_t total_byte_len = 0; - u_int32_t preload_crc = 0; - u_int32_t message_crc = 0; - - total_byte_len = sql_result.size() + 4; //the total is greater in 4 bytes than current size - - push_encode_int(total_byte_len, 0); - push_encode_int(header_len, 4); - - crc32.reset(); - crc32 = std::for_each(sql_result.data(), sql_result.data() + 8, crc32); //crc for starting 8 bytes - preload_crc = crc32(); - push_encode_int(preload_crc, 8); - - crc32.reset(); - crc32 = std::for_each(sql_result.begin(), sql_result.end(), crc32); //crc for payload + checksum - message_crc = crc32(); - - u_int32_t x = htonl(message_crc); - sql_result.append(reinterpret_cast(&x), sizeof(x)); - - return sql_result.size(); -} - -void aws_response_handler::init_response() -{ //12 positions for header-crc - sql_result.resize(header_crc_size,'\0'); -} - -void aws_response_handler::init_success_response() -{ - m_buff_header.clear(); - header_size = create_header_records(); - sql_result.append(m_buff_header.c_str(), header_size); - sql_result.append(PAYLOAD_LINE); -} - -void aws_response_handler::send_continuation_response() -{ - sql_result.resize(header_crc_size,'\0'); - m_buff_header.clear(); - header_size = create_header_continuation(); - sql_result.append(m_buff_header.c_str(), header_size); - int buff_len = create_message(header_size); - s->formatter->write_bin_data(sql_result.data(), buff_len); - rgw_flush_formatter_and_reset(s, s->formatter); -} - -void aws_response_handler::init_progress_response() -{ - sql_result.resize(header_crc_size,'\0'); - m_buff_header.clear(); - header_size = create_header_progress(); - sql_result.append(m_buff_header.c_str(), header_size); -} - -void aws_response_handler::init_stats_response() -{ - sql_result.resize(header_crc_size,'\0'); - m_buff_header.clear(); - header_size = create_header_stats(); - sql_result.append(m_buff_header.c_str(), header_size); -} - -void aws_response_handler::init_end_response() -{ - sql_result.resize(header_crc_size,'\0'); - m_buff_header.clear(); - header_size = create_header_end(); - sql_result.append(m_buff_header.c_str(), header_size); - int buff_len = create_message(header_size); - s->formatter->write_bin_data(sql_result.data(), buff_len); - rgw_flush_formatter_and_reset(s, s->formatter); -} - -void aws_response_handler::init_error_response(const char *error_message) -{ //currently not in use. the headers in the case of error, are not extracted by AWS-cli. - m_buff_header.clear(); - header_size = create_error_header_records(error_message); - sql_result.append(m_buff_header.c_str(), header_size); -} - -void aws_response_handler::send_success_response() -{ - sql_result.append(END_PAYLOAD_LINE); - int buff_len = create_message(header_size); - s->formatter->write_bin_data(sql_result.data(), buff_len); - rgw_flush_formatter_and_reset(s, s->formatter); -} - -void aws_response_handler::send_error_response(const char *error_code, - const char *error_message, - const char *resource_id) -{ - - set_req_state_err(s, 0); - dump_errno(s, 400); - end_header(s, m_rgwop, "application/xml", CHUNKED_TRANSFER_ENCODING); - dump_start(s); - - s->formatter->open_object_section("Error"); - - s->formatter->dump_string("Code", error_code); - s->formatter->dump_string("Message", error_message); - s->formatter->dump_string("Resource", "#Resource#"); - s->formatter->dump_string("RequestId", resource_id); - - s->formatter->close_section(); - - rgw_flush_formatter_and_reset(s, s->formatter); -} - -void aws_response_handler::send_progress_response() -{ - std::string progress_payload = fmt::format("{}{}{}" - ,get_processed_size(),get_processed_size(),get_total_bytes_returned()); - - sql_result.append(progress_payload); - int buff_len = create_message(header_size); - s->formatter->write_bin_data(sql_result.data(), buff_len); - rgw_flush_formatter_and_reset(s, s->formatter); -} - -void aws_response_handler::send_stats_response() -{ - std::string stats_payload = fmt::format("{}{}{}" - ,get_processed_size(),get_processed_size(),get_total_bytes_returned()); - - sql_result.append(stats_payload); - int buff_len = create_message(header_size); - s->formatter->write_bin_data(sql_result.data(), buff_len); - rgw_flush_formatter_and_reset(s, s->formatter); -} - -RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3() : s3select_syntax(std::make_unique()), - m_s3_csv_object(std::unique_ptr()), - chunk_number(0) -{ - set_get_data(true); -} - -RGWSelectObj_ObjStore_S3::~RGWSelectObj_ObjStore_S3() -{} - -int RGWSelectObj_ObjStore_S3::get_params(optional_yield y) -{ - //retrieve s3-select query from payload - bufferlist data; - int ret; - int max_size = 4096; - std::tie(ret, data) = read_all_input(s, max_size, false); - if (ret != 0) { - ldpp_dout(this, 10) << "s3-select query: failed to retrieve query; ret = " << ret << dendl; - return ret; - } - - m_s3select_query = data.to_str(); - if (m_s3select_query.length() > 0) { - ldpp_dout(this, 10) << "s3-select query: " << m_s3select_query << dendl; - } - else { - ldpp_dout(this, 10) << "s3-select query: failed to retrieve query;" << dendl; - return -1; - } - - int status = handle_aws_cli_parameters(m_sql_query); - if (status<0) { - return status; - } - - return RGWGetObj_ObjStore_S3::get_params(y); -} - -int RGWSelectObj_ObjStore_S3::run_s3select(const char* query, const char* input, size_t input_length) -{ - int status = 0; - uint32_t length_before_processing, length_post_processing; - csv_object::csv_defintions csv; - const char* s3select_syntax_error = "s3select-Syntax-Error"; - const char* s3select_resource_id = "resourcse-id"; - const char* s3select_processTime_error = "s3select-ProcessingTime-Error"; - - if (m_s3_csv_object==0) { - s3select_syntax->parse_query(query); - - if (m_row_delimiter.size()) { - csv.row_delimiter = *m_row_delimiter.c_str(); - } - - if (m_column_delimiter.size()) { - csv.column_delimiter = *m_column_delimiter.c_str(); - } - - if (m_quot.size()) { - csv.quot_char = *m_quot.c_str(); - } - - if (m_escape_char.size()) { - csv.escape_char = *m_escape_char.c_str(); - } - - if (m_enable_progress.compare("true")==0) { - enable_progress = true; - } else { - enable_progress = false; - } - - if (output_row_delimiter.size()) { - csv.output_row_delimiter = *output_row_delimiter.c_str(); - } - - if (output_column_delimiter.size()) { - csv.output_column_delimiter = *output_column_delimiter.c_str(); - } - - if (output_quot.size()) { - csv.output_quot_char = *output_quot.c_str(); - } - - if (output_escape_char.size()) { - csv.output_escape_char = *output_escape_char.c_str(); - } - - if(output_quote_fields.compare("ALWAYS") == 0) { - csv.quote_fields_always = true; - } - else if(output_quote_fields.compare("ASNEEDED") == 0) { - csv.quote_fields_asneeded = true; - } - - if(m_header_info.compare("IGNORE")==0) { - csv.ignore_header_info=true; - } - else if(m_header_info.compare("USE")==0) { - csv.use_header_info=true; - } - m_s3_csv_object = std::unique_ptr(new s3selectEngine::csv_object(s3select_syntax.get(), csv)); - } - - m_aws_response_handler->init_response(); - - if (s3select_syntax->get_error_description().empty() == false) - { //error-flow (syntax-error) - m_aws_response_handler->send_error_response(s3select_syntax_error, - s3select_syntax->get_error_description().c_str(), - s3select_resource_id); - - ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax->get_error_description() << "}" << dendl; - return -1; - } - else - { - if (input == nullptr) { - input = ""; - } - m_aws_response_handler->init_success_response(); - length_before_processing = (m_aws_response_handler->get_sql_result()).size(); - - //query is correct(syntax), processing is starting. - status = m_s3_csv_object->run_s3select_on_stream(m_aws_response_handler->get_sql_result(), input, input_length, s->obj_size); - length_post_processing = (m_aws_response_handler->get_sql_result()).size(); - m_aws_response_handler->update_total_bytes_returned(length_post_processing-length_before_processing); - if (status < 0) - { //error flow(processing-time) - m_aws_response_handler->send_error_response(s3select_processTime_error, - m_s3_csv_object->get_error_description().c_str(), - s3select_resource_id); - - ldpp_dout(this, 10) << "s3-select query: failed to process query; {" << m_s3_csv_object->get_error_description() << "}" << dendl; - return -1; - } - - if (chunk_number == 0) - {//success flow - if (op_ret < 0) - { - set_req_state_err(s, op_ret); - } - dump_errno(s); - // Explicitly use chunked transfer encoding so that we can stream the result - // to the user without having to wait for the full length of it. - end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING); - } - chunk_number++; - } - - if (length_post_processing-length_before_processing != 0) { - m_aws_response_handler->send_success_response(); - } else { - m_aws_response_handler->send_continuation_response(); - } - - if (enable_progress == true) { - m_aws_response_handler->init_progress_response(); - m_aws_response_handler->send_progress_response(); - } - - return status; -} - -int RGWSelectObj_ObjStore_S3::handle_aws_cli_parameters(std::string& sql_query) -{ - std::string input_tag{"InputSerialization"}; - std::string output_tag{"OutputSerialization"}; - - if(chunk_number !=0) { - return 0; - } - -#define GT ">" -#define LT "<" - if (m_s3select_query.find(GT) != std::string::npos) { - boost::replace_all(m_s3select_query, GT, ">"); - } - if (m_s3select_query.find(LT) != std::string::npos) { - boost::replace_all(m_s3select_query, LT, "<"); - } - - //AWS cli s3select parameters - extract_by_tag(m_s3select_query, "Expression", sql_query); - extract_by_tag(m_s3select_query, "Enabled", m_enable_progress); - - size_t _qi = m_s3select_query.find("<" + input_tag + ">", 0); - size_t _qe = m_s3select_query.find("", _qi); - m_s3select_input = m_s3select_query.substr(_qi + input_tag.size() + 2, _qe - (_qi + input_tag.size() + 2)); - - extract_by_tag(m_s3select_input,"FieldDelimiter", m_column_delimiter); - extract_by_tag(m_s3select_input, "QuoteCharacter", m_quot); - extract_by_tag(m_s3select_input, "RecordDelimiter", m_row_delimiter); - extract_by_tag(m_s3select_input, "FileHeaderInfo", m_header_info); - - if (m_row_delimiter.size()==0) { - m_row_delimiter='\n'; - } - else if(m_row_delimiter.compare(" ") == 0) - {//presto change - m_row_delimiter='\n'; - } - - extract_by_tag(m_s3select_input, "QuoteEscapeCharacter", m_escape_char); - extract_by_tag(m_s3select_input, "CompressionType", m_compression_type); - - size_t _qo = m_s3select_query.find("<" + output_tag + ">", 0); - size_t _qs = m_s3select_query.find("", _qi); - m_s3select_output = m_s3select_query.substr(_qo + output_tag.size() + 2, _qs - (_qo + output_tag.size() + 2)); - - extract_by_tag(m_s3select_output, "FieldDelimiter", output_column_delimiter); - extract_by_tag(m_s3select_output, "QuoteCharacter", output_quot); - extract_by_tag(m_s3select_output, "QuoteEscapeCharacter", output_escape_char); - extract_by_tag(m_s3select_output, "QuoteFields", output_quote_fields); - extract_by_tag(m_s3select_output, "RecordDelimiter", output_row_delimiter); - - if (output_row_delimiter.size()==0) { - output_row_delimiter='\n'; - } - else if(output_row_delimiter.compare(" ") == 0) - {//presto change - output_row_delimiter='\n'; - } - - if (m_compression_type.length()>0 && m_compression_type.compare("NONE") != 0) { - ldpp_dout(this, 10) << "RGW supports currently only NONE option for compression type" << dendl; - return -1; - } - - return 0; -} - -int RGWSelectObj_ObjStore_S3::extract_by_tag(std::string input, std::string tag_name, std::string& result) -{ - result = ""; - size_t _qs = input.find("<" + tag_name + ">", 0); - size_t qs_input = _qs + tag_name.size() + 2; - if (_qs == std::string::npos) { - return -1; - } - size_t _qe = input.find("", qs_input); - if (_qe == std::string::npos) { - return -1; - } - - result = input.substr(qs_input, _qe - qs_input); - - return 0; -} - -int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_t len) -{ - int status=0; - if (m_aws_response_handler == nullptr) { - m_aws_response_handler = std::make_unique(s,this); - } - - if(len == 0 && s->obj_size != 0) { - return 0; - } - - if (s->obj_size == 0) { - status = run_s3select(m_sql_query.c_str(), nullptr, 0); - } else { - - auto bl_len = bl.get_num_buffers(); - - int i=0; - - for(auto& it : bl.buffers()) { - - ldpp_dout(this, 10) << "processing segment " << i << " out of " << bl_len << " off " << ofs - << " len " << len << " obj-size " << s->obj_size << dendl; - - if(it.length() == 0) { - ldpp_dout(this, 10) << "s3select:it->_len is zero. segment " << i << " out of " << bl_len - << " obj-size " << s->obj_size << dendl; - continue; - } - - m_aws_response_handler->update_processed_size(it.length()); - - status = run_s3select(m_sql_query.c_str(), &(it)[0], it.length()); - if(status<0) { - break; - } - i++; - } - } - - if (m_aws_response_handler->get_processed_size() == s->obj_size) { - if (status >=0) { - m_aws_response_handler->init_stats_response(); - m_aws_response_handler->send_stats_response(); - m_aws_response_handler->init_end_response(); - } - } - return status; -} diff --git a/src/rgw/rgw_rest_s3.h b/src/rgw/rgw_rest_s3.h index 7c534cbfffd23..d9d60376d0728 100644 --- a/src/rgw/rgw_rest_s3.h +++ b/src/rgw/rgw_rest_s3.h @@ -908,161 +908,6 @@ inline int valid_s3_bucket_name(const std::string& name, bool relaxed=false) return 0; } -namespace s3selectEngine -{ -class s3select; -class csv_object; -} - - -class aws_response_handler -{//TODO this class should reside on s3select submodule - -private: - std::string sql_result; - struct req_state *s;//TODO will be replace by callback - uint32_t header_size; - // the parameters are according to CRC-32 algorithm and its aligned with AWS-cli checksum - boost::crc_optimal<32, 0x04C11DB7, 0xFFFFFFFF, 0xFFFFFFFF, true, true> crc32; - RGWOp *m_rgwop; - std::string m_buff_header; - uint64_t total_bytes_returned; - uint64_t processed_size; - - enum class header_name_En - { - EVENT_TYPE, - CONTENT_TYPE, - MESSAGE_TYPE, - ERROR_CODE, - ERROR_MESSAGE - }; - - enum class header_value_En - { - RECORDS, - OCTET_STREAM, - EVENT, - CONT, - PROGRESS, - END, - XML, - STATS, - ENGINE_ERROR, - ERROR_TYPE - }; - - const char *PAYLOAD_LINE= "\n\n\n\n"; - const char *END_PAYLOAD_LINE= "\n"; - const char *header_name_str[5] = {":event-type", ":content-type", ":message-type",":error-code",":error-message"}; - const char *header_value_str[10] = {"Records", "application/octet-stream", "event", "Cont", "Progress", "End", "text/xml", "Stats", "s3select-engine-error","error"}; - static constexpr size_t header_crc_size = 12; - - void push_header(const char * header_name,const char* header_value); - - int create_message(u_int32_t header_len); - -public: - //12 positions for header-crc - aws_response_handler(struct req_state *ps,RGWOp *rgwop) : sql_result("012345678901"), s(ps),m_rgwop(rgwop),total_bytes_returned{0},processed_size{0} - { } - - std::string &get_sql_result(); - - uint64_t get_processed_size(); - - void update_processed_size(uint64_t value); - - uint64_t get_total_bytes_returned(); - - void update_total_bytes_returned(uint64_t value); - - int create_header_records(); - - int create_header_continuation(); - - int create_header_progress(); - - int create_header_stats(); - - int create_header_end(); - - int create_error_header_records(const char* error_message); - - void init_response(); - - void init_success_response(); - - void send_continuation_response(); - - void init_progress_response(); - - void init_end_response(); - - void init_stats_response(); - - void init_error_response(const char* error_message); - - void send_success_response(); - - void send_progress_response(); - - void send_stats_response(); - - void send_error_response(const char* error_code, - const char* error_message, - const char* resource_id); - -}; //end class aws_response_handler - -class RGWSelectObj_ObjStore_S3 : public RGWGetObj_ObjStore_S3 -{ - -private: - std::unique_ptr s3select_syntax; - std::string m_s3select_query; - std::string m_s3select_input; - std::string m_s3select_output; - std::unique_ptr m_s3_csv_object; - std::string m_column_delimiter; - std::string m_quot; - std::string m_row_delimiter; - std::string m_compression_type; - std::string m_escape_char; - std::string m_header_info; - std::string m_sql_query; - std::string m_enable_progress; - std::string output_column_delimiter; - std::string output_quot; - std::string output_escape_char; - std::string output_quote_fields; - std::string output_row_delimiter; - - std::unique_ptr m_aws_response_handler; - bool enable_progress; - -public: - unsigned int chunk_number; - - RGWSelectObj_ObjStore_S3(); - virtual ~RGWSelectObj_ObjStore_S3(); - - virtual int send_response_data(bufferlist& bl, off_t ofs, off_t len) override; - - virtual int get_params(optional_yield y) override; - -private: - - int run_s3select(const char* query, const char* input, size_t input_length); - - int extract_by_tag(std::string input, std::string tag_name, std::string& result); - - void convert_escape_seq(std::string& esc); - - int handle_aws_cli_parameters(std::string& sql_query); -}; - - namespace rgw::auth::s3 { class AWSEngine : public rgw::auth::Engine { diff --git a/src/rgw/rgw_s3select.cc b/src/rgw/rgw_s3select.cc new file mode 100644 index 0000000000000..19c787303be91 --- /dev/null +++ b/src/rgw/rgw_s3select.cc @@ -0,0 +1,669 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include "rgw_s3select_private.h" + +namespace rgw::s3select { +RGWOp* create_s3select_op() +{ + return new RGWSelectObj_ObjStore_S3(); +} +}; + +using namespace s3selectEngine; + +std::string& aws_response_handler::get_sql_result() +{ + return sql_result; +} + +uint64_t aws_response_handler::get_processed_size() +{ + return processed_size; +} + +void aws_response_handler::update_processed_size(uint64_t value) +{ + processed_size += value; +} + +uint64_t aws_response_handler::get_total_bytes_returned() +{ + return total_bytes_returned; +} + +void aws_response_handler::update_total_bytes_returned(uint64_t value) +{ + total_bytes_returned += value; +} + +void aws_response_handler::push_header(const char* header_name, const char* header_value) +{ + char x; + short s; + x = char(strlen(header_name)); + m_buff_header.append(&x, sizeof(x)); + m_buff_header.append(header_name); + x = char(7); + m_buff_header.append(&x, sizeof(x)); + s = htons(uint16_t(strlen(header_value))); + m_buff_header.append(reinterpret_cast(&s), sizeof(s)); + m_buff_header.append(header_value); +} + +#define IDX( x ) static_cast( x ) + +int aws_response_handler::create_header_records() +{ + //headers description(AWS) + //[header-name-byte-length:1][header-name:variable-length][header-value-type:1][header-value:variable-length] + //1 + push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::RECORDS)]); + //2 + push_header(header_name_str[IDX(header_name_En::CONTENT_TYPE)], header_value_str[IDX(header_value_En::OCTET_STREAM)]); + //3 + push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]); + return m_buff_header.size(); +} + +int aws_response_handler::create_header_continuation() +{ + //headers description(AWS) + //1 + push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::CONT)]); + //2 + push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]); + return m_buff_header.size(); +} + +int aws_response_handler::create_header_progress() +{ + //headers description(AWS) + //1 + push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::PROGRESS)]); + //2 + push_header(header_name_str[IDX(header_name_En::CONTENT_TYPE)], header_value_str[IDX(header_value_En::XML)]); + //3 + push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]); + return m_buff_header.size(); +} + +int aws_response_handler::create_header_stats() +{ + //headers description(AWS) + //1 + push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::STATS)]); + //2 + push_header(header_name_str[IDX(header_name_En::CONTENT_TYPE)], header_value_str[IDX(header_value_En::XML)]); + //3 + push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]); + return m_buff_header.size(); +} + +int aws_response_handler::create_header_end() +{ + //headers description(AWS) + //1 + push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::END)]); + //2 + push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]); + return m_buff_header.size(); +} + +int aws_response_handler::create_error_header_records(const char* error_message) +{ + //headers description(AWS) + //[header-name-byte-length:1][header-name:variable-length][header-value-type:1][header-value:variable-length] + //1 + push_header(header_name_str[IDX(header_name_En::ERROR_CODE)], header_value_str[IDX(header_value_En::ENGINE_ERROR)]); + //2 + push_header(header_name_str[IDX(header_name_En::ERROR_MESSAGE)], error_message); + //3 + push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::ERROR_TYPE)]); + return m_buff_header.size(); +} + +int aws_response_handler::create_message(u_int32_t header_len) +{ + //message description(AWS): + //[total-byte-length:4][header-byte-length:4][crc:4][headers:variable-length][payload:variable-length][crc:4] + //s3select result is produced into sql_result, the sql_result is also the response-message, thus the attach headers and CRC + //are created later to the produced SQL result, and actually wrapping the payload. + auto push_encode_int = [&](u_int32_t s, int pos) { + u_int32_t x = htonl(s); + sql_result.replace(pos, sizeof(x), reinterpret_cast(&x), sizeof(x)); + }; + u_int32_t total_byte_len = 0; + u_int32_t preload_crc = 0; + u_int32_t message_crc = 0; + total_byte_len = sql_result.size() + 4; //the total is greater in 4 bytes than current size + push_encode_int(total_byte_len, 0); + push_encode_int(header_len, 4); + crc32.reset(); + crc32 = std::for_each(sql_result.data(), sql_result.data() + 8, crc32); //crc for starting 8 bytes + preload_crc = crc32(); + push_encode_int(preload_crc, 8); + crc32.reset(); + crc32 = std::for_each(sql_result.begin(), sql_result.end(), crc32); //crc for payload + checksum + message_crc = crc32(); + u_int32_t x = htonl(message_crc); + sql_result.append(reinterpret_cast(&x), sizeof(x)); + return sql_result.size(); +} + +void aws_response_handler::init_response() +{ + //12 positions for header-crc + sql_result.resize(header_crc_size, '\0'); +} + +void aws_response_handler::init_success_response() +{ + m_buff_header.clear(); + header_size = create_header_records(); + sql_result.append(m_buff_header.c_str(), header_size); + sql_result.append(PAYLOAD_LINE); +} + +void aws_response_handler::send_continuation_response() +{ + sql_result.resize(header_crc_size, '\0'); + m_buff_header.clear(); + header_size = create_header_continuation(); + sql_result.append(m_buff_header.c_str(), header_size); + int buff_len = create_message(header_size); + s->formatter->write_bin_data(sql_result.data(), buff_len); + rgw_flush_formatter_and_reset(s, s->formatter); +} + +void aws_response_handler::init_progress_response() +{ + sql_result.resize(header_crc_size, '\0'); + m_buff_header.clear(); + header_size = create_header_progress(); + sql_result.append(m_buff_header.c_str(), header_size); +} + +void aws_response_handler::init_stats_response() +{ + sql_result.resize(header_crc_size, '\0'); + m_buff_header.clear(); + header_size = create_header_stats(); + sql_result.append(m_buff_header.c_str(), header_size); +} + +void aws_response_handler::init_end_response() +{ + sql_result.resize(header_crc_size, '\0'); + m_buff_header.clear(); + header_size = create_header_end(); + sql_result.append(m_buff_header.c_str(), header_size); + int buff_len = create_message(header_size); + s->formatter->write_bin_data(sql_result.data(), buff_len); + rgw_flush_formatter_and_reset(s, s->formatter); +} + +void aws_response_handler::init_error_response(const char* error_message) +{ + //currently not in use. the headers in the case of error, are not extracted by AWS-cli. + m_buff_header.clear(); + header_size = create_error_header_records(error_message); + sql_result.append(m_buff_header.c_str(), header_size); +} + +void aws_response_handler::send_success_response() +{ + sql_result.append(END_PAYLOAD_LINE); + int buff_len = create_message(header_size); + s->formatter->write_bin_data(sql_result.data(), buff_len); + rgw_flush_formatter_and_reset(s, s->formatter); +} + +void aws_response_handler::send_error_response(const char* error_code, + const char* error_message, + const char* resource_id) +{ + set_req_state_err(s, 0); + dump_errno(s, 400); + end_header(s, m_rgwop, "application/xml", CHUNKED_TRANSFER_ENCODING); + dump_start(s); + s->formatter->open_object_section("Error"); + s->formatter->dump_string("Code", error_code); + s->formatter->dump_string("Message", error_message); + s->formatter->dump_string("Resource", "#Resource#"); + s->formatter->dump_string("RequestId", resource_id); + s->formatter->close_section(); + rgw_flush_formatter_and_reset(s, s->formatter); +} + +void aws_response_handler::send_progress_response() +{ + std::string progress_payload = fmt::format("{}{}{}" + , get_processed_size(), get_processed_size(), get_total_bytes_returned()); + sql_result.append(progress_payload); + int buff_len = create_message(header_size); + s->formatter->write_bin_data(sql_result.data(), buff_len); + rgw_flush_formatter_and_reset(s, s->formatter); +} + +void aws_response_handler::send_stats_response() +{ + std::string stats_payload = fmt::format("{}{}{}" + , get_processed_size(), get_processed_size(), get_total_bytes_returned()); + sql_result.append(stats_payload); + int buff_len = create_message(header_size); + s->formatter->write_bin_data(sql_result.data(), buff_len); + rgw_flush_formatter_and_reset(s, s->formatter); +} + +RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3(): + m_buff_header(std::make_unique(1000)), + m_parquet_type(false), + chunk_number(0) +{ + set_get_data(true); + fp_get_obj_size = [&]() { + return get_obj_size(); + }; + fp_range_req = [&](int64_t start, int64_t len, void* buff, optional_yield* y) { + ldout(s->cct, 10) << "S3select: range-request start: " << start << " length: " << len << dendl; + auto status = range_request(start, len, buff, *y); + return status; + }; +#ifdef _ARROW_EXIST + m_rgw_api.set_get_size_api(fp_get_obj_size); + m_rgw_api.set_range_req_api(fp_range_req); +#endif + fp_result_header_format = [this](std::string& result) { + m_aws_response_handler.init_response(); + m_aws_response_handler.init_success_response(); + return 0; + }; + fp_s3select_result_format = [this](std::string& result) { + m_aws_response_handler.send_success_response(); + return 0; + }; +} + +RGWSelectObj_ObjStore_S3::~RGWSelectObj_ObjStore_S3() +{} + +int RGWSelectObj_ObjStore_S3::get_params(optional_yield y) +{ + if(m_s3select_query.empty() == false) { + return 0; + } + if(s->object->get_name().find(".parquet") != std::string::npos) { //aws cli is missing the parquet +#ifdef _ARROW_EXIST + m_parquet_type = true; +#else + ldpp_dout(this, 10) << "arrow library is not installed" << dendl; +#endif + } + //retrieve s3-select query from payload + bufferlist data; + int ret; + int max_size = 4096; + std::tie(ret, data) = read_all_input(s, max_size, false); + if (ret != 0) { + ldpp_dout(this, 10) << "s3-select query: failed to retrieve query; ret = " << ret << dendl; + return ret; + } + m_s3select_query = data.to_str(); + if (m_s3select_query.length() > 0) { + ldpp_dout(this, 10) << "s3-select query: " << m_s3select_query << dendl; + } else { + ldpp_dout(this, 10) << "s3-select query: failed to retrieve query;" << dendl; + return -1; + } + int status = handle_aws_cli_parameters(m_sql_query); + if (status<0) { + return status; + } + return RGWGetObj_ObjStore_S3::get_params(y); +} + +int RGWSelectObj_ObjStore_S3::run_s3select(const char* query, const char* input, size_t input_length) +{ + int status = 0; + uint32_t length_before_processing, length_post_processing; + csv_object::csv_defintions csv; + const char* s3select_syntax_error = "s3select-Syntax-Error"; + const char* s3select_resource_id = "resourcse-id"; + const char* s3select_processTime_error = "s3select-ProcessingTime-Error"; + + s3select_syntax.parse_query(query); + if (m_row_delimiter.size()) { + csv.row_delimiter = *m_row_delimiter.c_str(); + } + if (m_column_delimiter.size()) { + csv.column_delimiter = *m_column_delimiter.c_str(); + } + if (m_quot.size()) { + csv.quot_char = *m_quot.c_str(); + } + if (m_escape_char.size()) { + csv.escape_char = *m_escape_char.c_str(); + } + if (m_enable_progress.compare("true")==0) { + enable_progress = true; + } else { + enable_progress = false; + } + if (output_row_delimiter.size()) { + csv.output_row_delimiter = *output_row_delimiter.c_str(); + } + if (output_column_delimiter.size()) { + csv.output_column_delimiter = *output_column_delimiter.c_str(); + } + if (output_quot.size()) { + csv.output_quot_char = *output_quot.c_str(); + } + if (output_escape_char.size()) { + csv.output_escape_char = *output_escape_char.c_str(); + } + if(output_quote_fields.compare("ALWAYS") == 0) { + csv.quote_fields_always = true; + } else if(output_quote_fields.compare("ASNEEDED") == 0) { + csv.quote_fields_asneeded = true; + } + if(m_header_info.compare("IGNORE")==0) { + csv.ignore_header_info=true; + } else if(m_header_info.compare("USE")==0) { + csv.use_header_info=true; + } + m_s3_csv_object.set_csv_query(&s3select_syntax, csv); + m_aws_response_handler.init_response(); + if (s3select_syntax.get_error_description().empty() == false) { + //error-flow (syntax-error) + m_aws_response_handler.send_error_response(s3select_syntax_error, + s3select_syntax.get_error_description().c_str(), + s3select_resource_id); + ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax.get_error_description() << "}" << dendl; + return -1; + } else { + if (input == nullptr) { + input = ""; + } + m_aws_response_handler.init_success_response(); + length_before_processing = (m_aws_response_handler.get_sql_result()).size(); + //query is correct(syntax), processing is starting. + status = m_s3_csv_object.run_s3select_on_stream(m_aws_response_handler.get_sql_result(), input, input_length, s->obj_size); + length_post_processing = (m_aws_response_handler.get_sql_result()).size(); + m_aws_response_handler.update_total_bytes_returned(length_post_processing-length_before_processing); + if (status < 0) { + //error flow(processing-time) + m_aws_response_handler.send_error_response(s3select_processTime_error, + m_s3_csv_object.get_error_description().c_str(), + s3select_resource_id); + ldpp_dout(this, 10) << "s3-select query: failed to process query; {" << m_s3_csv_object.get_error_description() << "}" << dendl; + return -1; + } + if (chunk_number == 0) { + //success flow + if (op_ret < 0) { + set_req_state_err(s, op_ret); + } + dump_errno(s); + // Explicitly use chunked transfer encoding so that we can stream the result + // to the user without having to wait for the full length of it. + end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING); + } + chunk_number++; + } + if (length_post_processing-length_before_processing != 0) { + m_aws_response_handler.send_success_response(); + } else { + m_aws_response_handler.send_continuation_response(); + } + if (enable_progress == true) { + m_aws_response_handler.init_progress_response(); + m_aws_response_handler.send_progress_response(); + } + return status; +} + +int RGWSelectObj_ObjStore_S3::run_s3select_on_parquet(const char* query) +{ + int status = 0; +#ifdef _ARROW_EXIST + if (!m_s3_parquet_object.is_set()) { + s3select_syntax.parse_query(m_sql_query.c_str()); + try { + m_s3_parquet_object.set_parquet_object(std::string("s3object"), &s3select_syntax, &m_rgw_api); + } catch(base_s3select_exception& e) { + ldpp_dout(this, 10) << "S3select: failed upon parquet-reader construction: " << e.what() << dendl; + fp_result_header_format(m_aws_response_handler.get_sql_result()); + m_aws_response_handler.get_sql_result().append(e.what()); + fp_s3select_result_format(m_aws_response_handler.get_sql_result()); + return -1; + } + } + if (s3select_syntax.get_error_description().empty() == false) { + fp_result_header_format(m_aws_response_handler.get_sql_result()); + m_aws_response_handler.get_sql_result().append(s3select_syntax.get_error_description().data()); + fp_s3select_result_format(m_aws_response_handler.get_sql_result()); + ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax.get_error_description() << "}" << dendl; + status = -1; + } else { + fp_result_header_format(m_aws_response_handler.get_sql_result()); + status = m_s3_parquet_object.run_s3select_on_object(m_aws_response_handler.get_sql_result(), fp_s3select_result_format, fp_result_header_format); + if (status < 0) { + m_aws_response_handler.get_sql_result().append(m_s3_parquet_object.get_error_description()); + fp_s3select_result_format(m_aws_response_handler.get_sql_result()); + ldout(s->cct, 10) << "S3select: failure while execution" << m_s3_parquet_object.get_error_description() << dendl; + } + } +#endif + return status; +} + +int RGWSelectObj_ObjStore_S3::handle_aws_cli_parameters(std::string& sql_query) +{ + std::string input_tag{"InputSerialization"}; + std::string output_tag{"OutputSerialization"}; + if(chunk_number !=0) { + return 0; + } +#define GT ">" +#define LT "<" + if (m_s3select_query.find(GT) != std::string::npos) { + boost::replace_all(m_s3select_query, GT, ">"); + } + if (m_s3select_query.find(LT) != std::string::npos) { + boost::replace_all(m_s3select_query, LT, "<"); + } + //AWS cli s3select parameters + extract_by_tag(m_s3select_query, "Expression", sql_query); + extract_by_tag(m_s3select_query, "Enabled", m_enable_progress); + size_t _qi = m_s3select_query.find("<" + input_tag + ">", 0); + size_t _qe = m_s3select_query.find("", _qi); + m_s3select_input = m_s3select_query.substr(_qi + input_tag.size() + 2, _qe - (_qi + input_tag.size() + 2)); + extract_by_tag(m_s3select_input, "FieldDelimiter", m_column_delimiter); + extract_by_tag(m_s3select_input, "QuoteCharacter", m_quot); + extract_by_tag(m_s3select_input, "RecordDelimiter", m_row_delimiter); + extract_by_tag(m_s3select_input, "FileHeaderInfo", m_header_info); + if (m_row_delimiter.size()==0) { + m_row_delimiter='\n'; + } else if(m_row_delimiter.compare(" ") == 0) { + //presto change + m_row_delimiter='\n'; + } + extract_by_tag(m_s3select_input, "QuoteEscapeCharacter", m_escape_char); + extract_by_tag(m_s3select_input, "CompressionType", m_compression_type); + size_t _qo = m_s3select_query.find("<" + output_tag + ">", 0); + size_t _qs = m_s3select_query.find("", _qi); + m_s3select_output = m_s3select_query.substr(_qo + output_tag.size() + 2, _qs - (_qo + output_tag.size() + 2)); + extract_by_tag(m_s3select_output, "FieldDelimiter", output_column_delimiter); + extract_by_tag(m_s3select_output, "QuoteCharacter", output_quot); + extract_by_tag(m_s3select_output, "QuoteEscapeCharacter", output_escape_char); + extract_by_tag(m_s3select_output, "QuoteFields", output_quote_fields); + extract_by_tag(m_s3select_output, "RecordDelimiter", output_row_delimiter); + if (output_row_delimiter.size()==0) { + output_row_delimiter='\n'; + } else if(output_row_delimiter.compare(" ") == 0) { + //presto change + output_row_delimiter='\n'; + } + if (m_compression_type.length()>0 && m_compression_type.compare("NONE") != 0) { + ldpp_dout(this, 10) << "RGW supports currently only NONE option for compression type" << dendl; + return -1; + } + return 0; +} + +int RGWSelectObj_ObjStore_S3::extract_by_tag(std::string input, std::string tag_name, std::string& result) +{ + result = ""; + size_t _qs = input.find("<" + tag_name + ">", 0); + size_t qs_input = _qs + tag_name.size() + 2; + if (_qs == std::string::npos) { + return -1; + } + size_t _qe = input.find("", qs_input); + if (_qe == std::string::npos) { + return -1; + } + result = input.substr(qs_input, _qe - qs_input); + return 0; +} + +size_t RGWSelectObj_ObjStore_S3::get_obj_size() +{ + return s->obj_size; +} + +int RGWSelectObj_ObjStore_S3::range_request(int64_t ofs, int64_t len, void* buff, optional_yield y) +{ + //purpose: implementation for arrow::ReadAt, this may take several async calls. + //send_response_date(call_back) accumulate buffer, upon completion control is back to ReadAt. + range_req_str = "bytes=" + std::to_string(ofs) + "-" + std::to_string(ofs+len-1); + range_str = range_req_str.c_str(); + range_parsed = false; + RGWGetObj::parse_range(); + requested_buffer.clear(); + m_request_range = len; + ldout(s->cct, 10) << "S3select: calling execute(async):" << " request-offset :" << ofs << " request-length :" << len << " buffer size : " << requested_buffer.size() << dendl; + RGWGetObj::execute(y); + memcpy(buff, requested_buffer.data(), len); + ldout(s->cct, 10) << "S3select: done waiting, buffer is complete buffer-size:" << requested_buffer.size() << dendl; + return len; +} + +void RGWSelectObj_ObjStore_S3::execute(optional_yield y) +{ + int status = 0; + char parquet_magic[4]; + static constexpr uint8_t parquet_magic1[4] = {'P', 'A', 'R', '1'}; + static constexpr uint8_t parquet_magicE[4] = {'P', 'A', 'R', 'E'}; + get_params(y); +#ifdef _ARROW_EXIST + m_rgw_api.m_y = &y; +#endif + if (m_parquet_type) { + //parquet processing + range_request(0, 4, parquet_magic, y); + if(memcmp(parquet_magic, parquet_magic1, 4) && memcmp(parquet_magic, parquet_magicE, 4)) { + ldout(s->cct, 10) << s->object->get_name() << " does not contain parquet magic" << dendl; + op_ret = -ERR_INVALID_REQUEST; + return; + } + s3select_syntax.parse_query(m_sql_query.c_str()); + status = run_s3select_on_parquet(m_sql_query.c_str()); + if (status) { + ldout(s->cct, 10) << "S3select: failed to process query <" << m_sql_query << "> on object " << s->object->get_name() << dendl; + op_ret = -ERR_INVALID_REQUEST; + } else { + ldout(s->cct, 10) << "S3select: complete query with success " << dendl; + } + } else { + //CSV processing + RGWGetObj::execute(y); + } +} + +int RGWSelectObj_ObjStore_S3::parquet_processing(bufferlist& bl, off_t ofs, off_t len) +{ + if (chunk_number == 0) { + if (op_ret < 0) { + set_req_state_err(s, op_ret); + } + dump_errno(s); + } + // Explicitly use chunked transfer encoding so that we can stream the result + // to the user without having to wait for the full length of it. + if (chunk_number == 0) { + end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING); + } + chunk_number++; + size_t append_in_callback = 0; + int part_no = 1; + //concat the requested buffer + for (auto& it : bl.buffers()) { + if(it.length() == 0) { + ldout(s->cct, 10) << "S3select: get zero-buffer while appending request-buffer " << dendl; + } + append_in_callback += it.length(); + ldout(s->cct, 10) << "S3select: part " << part_no++ << " it.length() = " << it.length() << dendl; + requested_buffer.append(&(it)[0]+ofs, len); + } + ldout(s->cct, 10) << "S3select:append_in_callback = " << append_in_callback << dendl; + if (requested_buffer.size() < m_request_range) { + ldout(s->cct, 10) << "S3select: need another round buffe-size: " << requested_buffer.size() << " request range length:" << m_request_range << dendl; + return 0; + } else {//buffer is complete + ldout(s->cct, 10) << "S3select: buffer is complete " << requested_buffer.size() << " request range length:" << m_request_range << dendl; + m_request_range = 0; + } + return 0; +} + +int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t len) +{ + int status = 0; + + if (s->obj_size == 0) { + status = run_s3select(m_sql_query.c_str(), nullptr, 0); + } else { + auto bl_len = bl.get_num_buffers(); + int i=0; + for(auto& it : bl.buffers()) { + ldpp_dout(this, 10) << "processing segment " << i << " out of " << bl_len << " off " << ofs + << " len " << len << " obj-size " << s->obj_size << dendl; + if(it.length() == 0) { + ldpp_dout(this, 10) << "s3select:it->_len is zero. segment " << i << " out of " << bl_len + << " obj-size " << s->obj_size << dendl; + continue; + } + m_aws_response_handler.update_processed_size(it.length()); + status = run_s3select(m_sql_query.c_str(), &(it)[0], it.length()); + if(status<0) { + break; + } + i++; + } + } + if (m_aws_response_handler.get_processed_size() == s->obj_size) { + if (status >=0) { + m_aws_response_handler.init_stats_response(); + m_aws_response_handler.send_stats_response(); + m_aws_response_handler.init_end_response(); + } + } + return status; +} + +int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_t len) +{ + if (!m_aws_response_handler.is_set()) { + m_aws_response_handler.set(s, this); + } + if(len == 0 && s->obj_size != 0) { + return 0; + } + if (m_parquet_type) { + return parquet_processing(bl,ofs,len); + } + return csv_processing(bl,ofs,len); +} + diff --git a/src/rgw/rgw_s3select.h b/src/rgw/rgw_s3select.h new file mode 100644 index 0000000000000..00d55cf93ea32 --- /dev/null +++ b/src/rgw/rgw_s3select.h @@ -0,0 +1,8 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp +// + +namespace rgw::s3select { +RGWOp* create_s3select_op(); +} + diff --git a/src/rgw/rgw_s3select_private.h b/src/rgw/rgw_s3select_private.h new file mode 100644 index 0000000000000..9174c04908fe3 --- /dev/null +++ b/src/rgw/rgw_s3select_private.h @@ -0,0 +1,230 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp +// +#pragma once + +#include +#include +#include +#include + +#include "common/ceph_crypto.h" +#include "common/split.h" +#include "common/Formatter.h" +#include "common/utf8.h" +#include "common/ceph_json.h" +#include "common/safe_io.h" +#include "common/errno.h" +#include "auth/Crypto.h" +#include +#include +#include +#define BOOST_BIND_GLOBAL_PLACEHOLDERS +#ifdef HAVE_WARN_IMPLICIT_CONST_INT_FLOAT_CONVERSION +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wimplicit-const-int-float-conversion" +#endif +#ifdef HAVE_WARN_IMPLICIT_CONST_INT_FLOAT_CONVERSION +#pragma clang diagnostic pop +#endif +#undef BOOST_BIND_GLOBAL_PLACEHOLDERS + +#include + + +#include +#include "rgw_rest_s3.h" +#include "rgw_s3select.h" + +class aws_response_handler +{ + +private: + std::string sql_result; + struct req_state* s; + uint32_t header_size; + // the parameters are according to CRC-32 algorithm and its aligned with AWS-cli checksum + boost::crc_optimal<32, 0x04C11DB7, 0xFFFFFFFF, 0xFFFFFFFF, true, true> crc32; + RGWOp* m_rgwop; + std::string m_buff_header; + uint64_t total_bytes_returned; + uint64_t processed_size; + + enum class header_name_En { + EVENT_TYPE, + CONTENT_TYPE, + MESSAGE_TYPE, + ERROR_CODE, + ERROR_MESSAGE + }; + + enum class header_value_En { + RECORDS, + OCTET_STREAM, + EVENT, + CONT, + PROGRESS, + END, + XML, + STATS, + ENGINE_ERROR, + ERROR_TYPE + }; + + const char* PAYLOAD_LINE= "\n\n\n\n"; + const char* END_PAYLOAD_LINE= "\n"; + const char* header_name_str[5] = {":event-type", ":content-type", ":message-type", ":error-code", ":error-message"}; + const char* header_value_str[10] = {"Records", "application/octet-stream", "event", "Cont", "Progress", "End", "text/xml", "Stats", "s3select-engine-error", "error"}; + static constexpr size_t header_crc_size = 12; + + void push_header(const char* header_name, const char* header_value); + + int create_message(u_int32_t header_len); + +public: + aws_response_handler(struct req_state* ps, RGWOp* rgwop) : s(ps), m_rgwop(rgwop), total_bytes_returned{0}, processed_size{0} + {} + + aws_response_handler() : s(nullptr), m_rgwop(nullptr), total_bytes_returned{0}, processed_size{0} + {} + + bool is_set() + { + if(s==nullptr || m_rgwop == nullptr){ + return false; + } + return true; + } + + void set(struct req_state* ps, RGWOp* rgwop) + { + s = ps; + m_rgwop = rgwop; + } + + std::string& get_sql_result(); + + uint64_t get_processed_size(); + + void update_processed_size(uint64_t value); + + uint64_t get_total_bytes_returned(); + + void update_total_bytes_returned(uint64_t value); + + int create_header_records(); + + int create_header_continuation(); + + int create_header_progress(); + + int create_header_stats(); + + int create_header_end(); + + int create_error_header_records(const char* error_message); + + void init_response(); + + void init_success_response(); + + void send_continuation_response(); + + void init_progress_response(); + + void init_end_response(); + + void init_stats_response(); + + void init_error_response(const char* error_message); + + void send_success_response(); + + void send_progress_response(); + + void send_stats_response(); + + void send_error_response(const char* error_code, + const char* error_message, + const char* resource_id); + +}; //end class aws_response_handler + +class RGWSelectObj_ObjStore_S3 : public RGWGetObj_ObjStore_S3 +{ + +private: + s3selectEngine::s3select s3select_syntax; + std::string m_s3select_query; + std::string m_s3select_input; + std::string m_s3select_output; + s3selectEngine::csv_object m_s3_csv_object; +#ifdef _ARROW_EXIST + s3selectEngine::parquet_object m_s3_parquet_object; +#endif + std::string m_column_delimiter; + std::string m_quot; + std::string m_row_delimiter; + std::string m_compression_type; + std::string m_escape_char; + std::unique_ptr m_buff_header; + std::string m_header_info; + std::string m_sql_query; + std::string m_enable_progress; + std::string output_column_delimiter; + std::string output_quot; + std::string output_escape_char; + std::string output_quote_fields; + std::string output_row_delimiter; + aws_response_handler m_aws_response_handler; + bool enable_progress; + + //parquet request + bool m_parquet_type; +#ifdef _ARROW_EXIST + s3selectEngine::rgw_s3select_api m_rgw_api; +#endif + //a request for range may statisfy by several calls to send_response_date; + size_t m_request_range; + std::string requested_buffer; + std::string range_req_str; + std::function fp_result_header_format; + std::function fp_s3select_result_format; + int m_header_size; + +public: + unsigned int chunk_number; + + RGWSelectObj_ObjStore_S3(); + virtual ~RGWSelectObj_ObjStore_S3(); + + virtual int send_response_data(bufferlist& bl, off_t ofs, off_t len) override; + + virtual int get_params(optional_yield y) override; + + virtual void execute(optional_yield) override; + +private: + + int csv_processing(bufferlist& bl, off_t ofs, off_t len); + + int parquet_processing(bufferlist& bl, off_t ofs, off_t len); + + int run_s3select(const char* query, const char* input, size_t input_length); + + int run_s3select_on_parquet(const char* query); + + int extract_by_tag(std::string input, std::string tag_name, std::string& result); + + void convert_escape_seq(std::string& esc); + + int handle_aws_cli_parameters(std::string& sql_query); + + int range_request(int64_t start, int64_t len, void*, optional_yield); + + size_t get_obj_size(); + std::function fp_range_req; + std::function fp_get_obj_size; + +}; + diff --git a/src/s3select b/src/s3select index f118a761300d7..1609bb2ab5441 160000 --- a/src/s3select +++ b/src/s3select @@ -1 +1 @@ -Subproject commit f118a761300d7d10b910bc1ba24935e093c064e7 +Subproject commit 1609bb2ab5441d2314f56858f4f98fc2be509f89 -- 2.39.5