From: gal salomon Date: Wed, 12 Feb 2020 04:21:54 +0000 (+0200) Subject: adding s3select sub-module; integrating sub-module into RGW; current commit is able... X-Git-Tag: v16.1.0~1923^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=61d6813726fc08980903c6a6c50b0a5797ba7193;p=ceph.git adding s3select sub-module; integrating sub-module into RGW; current commit is able to run s3-select queries on CSV-s3object; Signed-off-by: gal salomon --- diff --git a/.gitmodules b/.gitmodules index eead9cd910f3..6280d8e93eed 100644 --- a/.gitmodules +++ b/.gitmodules @@ -67,3 +67,6 @@ [submodule "src/pybind/mgr/rook/rook-client-python"] path = src/pybind/mgr/rook/rook-client-python url = https://github.com/ceph/rook-client-python.git +[submodule "s3select"] + path = src/s3select + url = https://github.com/ceph/s3select.git diff --git a/doc/radosgw/index.rst b/doc/radosgw/index.rst index 7c85ade8cfee..1d11e2a1d305 100644 --- a/doc/radosgw/index.rst +++ b/doc/radosgw/index.rst @@ -78,4 +78,5 @@ you may write data with one API and retrieve it with the other. Manpage radosgw <../../man/8/radosgw> Manpage radosgw-admin <../../man/8/radosgw-admin> QAT Acceleration for Encryption and Compression + S3-select diff --git a/doc/radosgw/s3select.rst b/doc/radosgw/s3select.rst new file mode 100644 index 000000000000..dc6415ac1a09 --- /dev/null +++ b/doc/radosgw/s3select.rst @@ -0,0 +1,268 @@ +=============== + Ceph s3 select +=============== + +.. contents:: + +Overview +-------- + + | The purpose of the **s3 select** engine is to create an efficient pipe between user client and storage nodes (the engine should be close as possible to storage). + | It enables selection of a restricted subset of (structured) data stored in an S3 object using an SQL-like syntax. + | It also enables for higher level analytic-applications (such as SPARK-SQL) , using that feature to improve their latency and throughput. + + | For example, a s3-object of several GB (CSV file), a user needs to extract a single column which filtered by another column. + | As the following query: + | ``select customer-id from s3Object where age>30 and age<65;`` + + | Currently the whole s3-object must retrieve from OSD via RGW before filtering and extracting data. + | By "pushing down" the query into OSD , it's possible to save a lot of network and CPU(serialization / deserialization). + + | **The bigger the object, and the more accurate the query, the better the performance**. + +Basic workflow +-------------- + + | S3-select query is sent to RGW via `AWS-CLI `_ + + | It passes the authentication and permission process as an incoming message (POST). + | **RGWSelectObj_ObjStore_S3::send_response_data** is the “entry point”, it handles each fetched chunk according to input object-key. + | **send_response_data** is first handling the input query, it extracts the query and other CLI parameters. + + | Per each new fetched chunk (~4m), RGW executes s3-select query on it. + | The current implementation supports CSV objects and since chunks are randomly “cutting” the CSV rows in the middle, those broken-lines (first or last per chunk) are skipped while processing the query. + | Those “broken” lines are stored and later merged with the next broken-line (belong to the next chunk), and finally processed. + + | Per each processed chunk an output message is formatted according to `AWS specification `_ and sent back to the client. + | RGW supports the following response: ``{:event-type,records} {:content-type,application/octet-stream} {:message-type,event}``. + | For aggregation queries the last chunk should be identified as the end of input, following that the s3-select-engine initiates end-of-process and produces an aggregate result. + + +Basic functionalities +~~~~~~~~~~~~~~~~~~~~~ + + | **S3select** has a definite set of functionalities that should be implemented (if we wish to stay compliant with AWS), currently only a portion of it is implemented. + + | The implemented software architecture supports basic arithmetic expressions, logical and compare expressions, including nested function calls and casting operators, that alone enables the user reasonable flexibility. + | review the bellow feature-table_. + + +Error Handling +~~~~~~~~~~~~~~ + + | Any error occurs while the input query processing, i.e. parsing phase or execution phase, is returned to client as response error message. + + | Fatal severity (attached to the exception) will end query execution immediately, other error severity are counted, upon reaching 100, it ends query execution with an error message. + + + + + +Features Support +---------------- + +.. _feature-table: + + | Currently only part of `AWS select command `_ is implemented, table bellow describes what is currently supported. + | The following table describes the current implementation for s3-select functionalities: + ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| Feature | Detailed | Example | ++=================================+=================+=======================================================================+ +| Arithmetic operators | ^ * / + - ( ) | select (int(_1)+int(_2))*int(_9) from stdin; | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| | | select ((1+2)*3.14) ^ 2 from stdin; | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| Compare operators | > < >= <= == != | select _1,_2 from stdin where (int(1)+int(_3))>int(_5); | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| logical operator | AND OR | select count(*) from stdin where int(1)>123 and int(_5)<200; | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| casting operator | int(expression) | select int(_1),int( 1.2 + 3.4) from stdin; | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| |float(expression)| select float(1.2) from stdin; | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| | timestamp(...) | select timestamp("1999:10:10-12:23:44") from stdin; | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| Aggregation Function | sum | select sum(int(_1)) from stdin; | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| Aggregation Function | min | select min( int(_1) * int(_5) ) from stdin; | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| Aggregation Function | max | select max(float(_1)),min(int(_5)) from stdin; | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| Aggregation Function | count | select count(*) from stdin where (int(1)+int(_3))>int(_5); | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| Timestamp Functions | extract | select count(*) from stdin where | +| | | extract("year",timestamp(_2)) > 1950 | +| | | and extract("year",timestamp(_1)) < 1960; | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| Timestamp Functions | dateadd | select count(0) from stdin where | +| | | datediff("year",timestamp(_1),dateadd("day",366,timestamp(_1))) == 1; | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| Timestamp Functions | datediff | select count(0) from stdin where | +| | | datediff("month",timestamp(_1),timestamp(_2))) == 2; | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| Timestamp Functions | utcnow | select count(0) from stdin where | +| | | datediff("hours",utcnow(),dateadd("day",1,utcnow())) == 24 ; | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| String Functions | substr | select count(0) from stdin where | +| | | int(substr(_1,1,4))>1950 and int(substr(_1,1,4))<1960; | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| alias support | | select int(_1) as a1, int(_2) as a2 , (a1+a2) as a3 | +| | | from stdin where a3>100 and a3<300; | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ + +s3-select function interfaces +----------------------------- + +Timestamp functions +~~~~~~~~~~~~~~~~~~~ + | The `timestamp functionalities `_ is partially implemented. + | the casting operator( ``timestamp( string )`` ), converts string to timestamp basic type. + | Currently it can convert the following pattern ``yyyy:mm:dd hh:mi:dd`` + + | ``extract( date-part , timestamp)`` : function return integer according to date-part extract from input timestamp. + | supported date-part : year,month,week,day. + + | ``dateadd(date-part , integer,timestamp)`` : function return timestamp, a calculation results of input timestamp and date-part. + | supported data-part : year,month,day. + + | ``datediff(date-part,timestamp,timestamp)`` : function return an integer, a calculated result for difference between 2 timestamps according to date-part. + | supported date-part : year,month,day,hours. + + + | ``utcnow()`` : return timestamp of current time. + +Aggregation functions +~~~~~~~~~~~~~~~~~~~~~ + + | ``count()`` : return integer according to number of rows matching condition(if such exist). + + | ``sum(expression)`` : return a summary of expression per all rows matching condition(if such exist). + + | ``max(expression)`` : return the maximal result for all expressions matching condition(if such exist). + + | ``min(expression)`` : return the minimal result for all expressions matching condition(if such exist). + +String functions +~~~~~~~~~~~~~~~~ + + | ``substr(string,from,to)`` : return a string extract from input string according to from,to inputs. + + +Alias +~~~~~ + | **Alias** programming-construct is an essential part of s3-select language, it enables much better programming especially with objects containing many columns or in the case of complex queries. + + | Upon parsing the statement containing alias construct, it replaces alias with reference to correct projection column, on query execution time the reference is evaluated as any other expression. + + | There is a risk that self(or cyclic) reference may occur causing stack-overflow(endless-loop), for that concern upon evaluating an alias, it is validated for cyclic reference. + + | Alias also maintains result-cache, meaning upon using the same alias more than once, it’s not evaluating the same expression again(it will return the same result),instead it uses the result from cache. + + | Of Course, per each new row the cache is invalidated. + +Sending Query to RGW +-------------------- + + | Any http-client can send s3-select request to RGW, it must be compliant with `AWS Request syntax `_. + + + + | Sending s3-select request to RGW using AWS cli, should follow `AWS command reference `_. + | bellow is an example for it. + +:: + + aws --endpoint-url http://localhost:8000 s3api select-object-content + --bucket {BUCKET-NAME} + --expression-type 'SQL' + --input-serialization + '{"CSV": {"FieldDelimiter": "," , "QuoteCharacter": "\"" , "RecordDelimiter" : "\n" , "QuoteEscapeCharacter" : "\\" , "FileHeaderInfo": "USE" }, "CompressionType": "NONE"}' + --output-serialization '{"CSV": {}}' + --key {OBJECT-NAME} + --expression "select count(0) from stdin where int(_1)<10;" output.csv + +Syntax +~~~~~~ + + | **Input serialization** (Implemented), it let the user define the CSV definitions; the default values are {\\n} for row-delimiter {,} for field delimiter, {"} for quote, {\\} for escape characters. + | it handle the **csv-header-info**, the first row in input object containing the schema. + | **Output serialization** is currently not implemented, the same for **compression-type**. + + | s3-select engine contain a CSV parser, which parse s3-objects as follows. + | - each row ends with row-delimiter. + | - field-separator separates between adjacent columns, successive field separator define NULL column. + | - quote-character overrides field separator, meaning , field separator become as any character between quotes. + | - escape character disables any special characters, except for row delimiter. + + | Below are examples for CSV parsing rules. + + +CSV parsing behavior +-------------------- + ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| Feature | Description | input ==> tokens | ++=================================+=================+=======================================================================+ +| NULL | successive | ,,1,,2, ==> {null}{null}{1}{null}{2}{null} | +| | field delimiter | | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| QUOTE | quote character | 11,22,"a,b,c,d",last ==> {11}{22}{"a,b,c,d"}{last} | +| | overrides | | +| | field delimiter | | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| Escape | escape char | 11,22,str=\\"abcd\\"\\,str2=\\"123\\",last | +| | overrides | ==> {11}{22}{str="abcd",str2="123"}{last} | +| | meta-character. | | +| | escape removed | | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| row delimiter | no close quote, | 11,22,a="str,44,55,66 | +| | row delimiter is| ==> {11}{22}{a="str,44,55,66} | +| | closing line | | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ +| csv header info | FileHeaderInfo | "**USE**" value means each token on first line is column-name, | +| | tag | "**IGNORE**" value means to skip the first line | ++---------------------------------+-----------------+-----------------------------------------------------------------------+ + + +BOTO3 +----- + + | using BOTO3 is "natural" and easy due to AWS-cli support. + +:: + + + def run_s3select(bucket,key,query,column_delim=",",row_delim="\n",quot_char='"',esc_char='\\',csv_header_info="NONE"): + s3 = boto3.client('s3', + endpoint_url=endpoint, + aws_access_key_id=access_key, + region_name=region_name, + aws_secret_access_key=secret_key) + + + + r = s3.select_object_content( + Bucket=bucket, + Key=key, + ExpressionType='SQL', + InputSerialization = {"CSV": {"RecordDelimiter" : row_delim, "FieldDelimiter" : column_delim,"QuoteEscapeCharacter": esc_char, "QuoteCharacter": quot_char, "FileHeaderInfo": csv_header_info}, "CompressionType": "NONE"}, + OutputSerialization = {"CSV": {}}, + Expression=query,) + + result = "" + for event in r['Payload']: + if 'Records' in event: + records = event['Records']['Payload'].decode('utf-8') + result += records + + return result + + + + + run_s3select( + "my_bucket", + "my_csv_object", + "select int(_1) as a1, int(_2) as a2 , (a1+a2) as a3 from stdin where a3>100 and a3<300;") + diff --git a/src/common/Formatter.cc b/src/common/Formatter.cc index c66fb0cfe622..5d268aab7427 100644 --- a/src/common/Formatter.cc +++ b/src/common/Formatter.cc @@ -75,6 +75,8 @@ FormatterAttrs::FormatterAttrs(const char *attr, ...) va_end(ap); } +void Formatter::write_bin_data(const char*, int){} + Formatter::Formatter() { } Formatter::~Formatter() { } @@ -544,6 +546,13 @@ void XMLFormatter::write_raw_data(const char *data) m_ss << data; } +void XMLFormatter::write_bin_data(const char* buff, int buf_len) +{ + std::stringbuf *pbuf = m_ss.rdbuf(); + pbuf->sputn(buff, buf_len); + m_ss.seekg(buf_len); +} + void XMLFormatter::get_attrs_str(const FormatterAttrs *attrs, std::string& attrs_str) { std::stringstream attrs_ss; diff --git a/src/common/Formatter.h b/src/common/Formatter.h index 228e22cfa1e8..e57ede878d9e 100644 --- a/src/common/Formatter.h +++ b/src/common/Formatter.h @@ -118,6 +118,7 @@ namespace ceph { virtual void *get_external_feature_handler(const std::string& feature) { return nullptr; } + virtual void write_bin_data(const char* buff, int buf_len); }; class copyable_sstream : public std::stringstream { @@ -226,6 +227,7 @@ namespace ceph { void dump_format_va(std::string_view name, const char *ns, bool quoted, const char *fmt, va_list ap) override; int get_len() const override; void write_raw_data(const char *data) override; + void write_bin_data(const char* buff, int len) override; /* with attrs */ void open_array_section_with_attrs(std::string_view name, const FormatterAttrs& attrs) override; diff --git a/src/rgw/rgw_rest.cc b/src/rgw/rgw_rest.cc index 045710491075..c72cf9cf24cd 100644 --- a/src/rgw/rgw_rest.cc +++ b/src/rgw/rgw_rest.cc @@ -1891,6 +1891,7 @@ int RGWHandler_REST::read_permissions(RGWOp* op_obj) /* is it a 'create bucket' request? */ if (op_obj->get_type() == RGW_OP_CREATE_BUCKET) return 0; + only_bucket = true; break; case OP_DELETE: diff --git a/src/rgw/rgw_rest_s3.cc b/src/rgw/rgw_rest_s3.cc index e1fe39f99e55..75ba9f075187 100644 --- a/src/rgw/rgw_rest_s3.cc +++ b/src/rgw/rgw_rest_s3.cc @@ -17,6 +17,7 @@ #include #include #include +#include #include @@ -4585,6 +4586,9 @@ RGWOp *RGWHandler_REST_Obj_S3::op_post() if (s->info.args.exists("uploads")) return new RGWInitMultipart_ObjStore_S3; + + if (is_select_op()) + return new RGWSelectObj_ObjStore_S3; return new RGWPostObj_ObjStore_S3; } @@ -5354,6 +5358,7 @@ AWSGeneralAbstractor::get_auth_data_v4(const req_state* const s, case RGW_OP_PUT_BUCKET_PUBLIC_ACCESS_BLOCK: case RGW_OP_GET_BUCKET_PUBLIC_ACCESS_BLOCK: case RGW_OP_DELETE_BUCKET_PUBLIC_ACCESS_BLOCK: + case RGW_OP_GET_OBJ://s3select its post-method(payload contain the query) , the request is get-object break; default: dout(10) << "ERROR: AWS4 completion for this operation NOT IMPLEMENTED" << dendl; @@ -5943,3 +5948,291 @@ bool rgw::auth::s3::S3AnonymousEngine::is_applicable( return route == AwsRoute::QUERY_STRING && version == AwsVersion::UNKNOWN; } + + +using namespace s3selectEngine; +const char* RGWSelectObj_ObjStore_S3::header_name_str[3] = {":event-type", ":content-type", ":message-type"}; +const char* RGWSelectObj_ObjStore_S3::header_value_str[3] = {"Records", "application/octet-stream", "event"}; + +RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3(): + s3select_syntax(std::make_unique()), + m_s3_csv_object(std::unique_ptr()), + m_buff_header(std::make_unique(1000)), + chunk_number(0), + crc32(std::unique_ptr()) +{ + set_get_data(true); +} + +RGWSelectObj_ObjStore_S3::~RGWSelectObj_ObjStore_S3() +{ +} + +int RGWSelectObj_ObjStore_S3::get_params() +{ + + //retrieve s3-select query from payload + bufferlist data; + int ret; + int max_size = 4096; + std::tie(ret, data) = rgw_rest_read_all_input(s, max_size, false); + if (ret != 0) { + ldout(s->cct, 10) << "s3-select query: failed to retrieve query; ret = " << ret << dendl; + return ret; + } + + m_s3select_query = data.to_str(); + if (m_s3select_query.length() > 0) { + ldout(s->cct, 10) << "s3-select query: " << m_s3select_query << dendl; + } + else { + ldout(s->cct, 10) << "s3-select query: failed to retrieve query;" << dendl; + return -1; + } + + int status = handle_aws_cli_parameters(m_sql_query); + + if (status<0) { + return status; + } + + return RGWGetObj_ObjStore_S3::get_params(); +} + +void RGWSelectObj_ObjStore_S3::encode_short(char* buff, uint16_t s, int& i) +{ + short x = htons(s); + memcpy(buff, &x, sizeof(s)); + i+=sizeof(s); +} + +void RGWSelectObj_ObjStore_S3::encode_int(char* buff, u_int32_t s, int& i) +{ + u_int32_t x = htonl(s); + memcpy(buff, &x, sizeof(s)); + i+=sizeof(s); +} + +int RGWSelectObj_ObjStore_S3::create_header_records(char* buff) +{ + int i = 0; + + //1 + buff[i++] = char(strlen(header_name_str[EVENT_TYPE])); + memcpy(&buff[i], header_name_str[EVENT_TYPE], strlen(header_name_str[EVENT_TYPE])); + i += strlen(header_name_str[EVENT_TYPE]); + buff[i++] = char(7); + encode_short(&buff[i], uint16_t(strlen(header_value_str[RECORDS])), i); + memcpy(&buff[i], header_value_str[RECORDS], strlen(header_value_str[RECORDS])); + i += strlen(header_value_str[RECORDS]); + + //2 + buff[i++] = char(strlen(header_name_str[CONTENT_TYPE])); + memcpy(&buff[i], header_name_str[CONTENT_TYPE], strlen(header_name_str[CONTENT_TYPE])); + i += strlen(header_name_str[CONTENT_TYPE]); + buff[i++] = char(7); + encode_short(&buff[i], uint16_t(strlen(header_value_str[OCTET_STREAM])), i); + memcpy(&buff[i], header_value_str[OCTET_STREAM], strlen(header_value_str[OCTET_STREAM])); + i += strlen(header_value_str[OCTET_STREAM]); + + //3 + buff[i++] = char(strlen(header_name_str[MESSAGE_TYPE])); + memcpy(&buff[i], header_name_str[MESSAGE_TYPE], strlen(header_name_str[MESSAGE_TYPE])); + i += strlen(header_name_str[MESSAGE_TYPE]); + buff[i++] = char(7); + encode_short(&buff[i], uint16_t(strlen(header_value_str[EVENT])), i); + memcpy(&buff[i], header_value_str[EVENT], strlen(header_value_str[EVENT])); + i += strlen(header_value_str[EVENT]); + + return i; +} + +int RGWSelectObj_ObjStore_S3::create_message(char* buff, u_int32_t result_len, u_int32_t header_len) +{ + u_int32_t total_byte_len = 0; + u_int32_t preload_crc = 0; + u_int32_t message_crc = 0; + int i = 0; + + if(crc32 ==0) { + // the parameters are according to CRC-32 algorithm and its aligned with AWS-cli checksum + crc32 = std::unique_ptr(new boost::crc_optimal<32, 0x04C11DB7, 0xFFFFFFFF, 0xFFFFFFFF, true, true>); + } + + total_byte_len = result_len + 16; + + encode_int(&buff[i], total_byte_len, i); + encode_int(&buff[i], header_len, i); + + crc32->reset(); + *crc32 = std::for_each( buff, buff + 8, *crc32 ); + preload_crc = (*crc32)(); + encode_int(&buff[i], preload_crc, i); + + i += result_len; + + crc32->reset(); + *crc32 = std::for_each( buff, buff + i, *crc32 ); + message_crc = (*crc32)(); + encode_int(&buff[i], message_crc, i); + + return i; +} + +#define PAYLOAD_LINE "\n\n\n\n" +#define END_PAYLOAD_LINE "\n" + +int RGWSelectObj_ObjStore_S3::run_s3select(const char* query, const char* input, size_t input_length) +{ + int status = 0; + csv_object::csv_defintions csv; + + m_result = "012345678901"; //12 positions for header-crc + + int header_size = 0; + + if (m_s3_csv_object==0) { + s3select_syntax->parse_query(query); + + if (m_row_delimiter.size()) { + csv.row_delimiter = *m_row_delimiter.c_str(); + } + + if (m_column_delimiter.size()) { + csv.column_delimiter = *m_column_delimiter.c_str(); + } + + if (m_quot.size()) { + csv.quot_char = *m_quot.c_str(); + } + + if (m_escape_char.size()) { + csv.escape_char = *m_escape_char.c_str(); + } + + if(m_header_info.compare("IGNORE")==0) { + csv.ignore_header_info=true; + } + else if(m_header_info.compare("USE")==0) { + csv.use_header_info=true; + } + + m_s3_csv_object = std::unique_ptr(new s3selectEngine::csv_object(s3select_syntax.get(), csv)); + } + + if (s3select_syntax->get_error_description().empty() == false) { + header_size = create_header_records(m_buff_header.get()); + m_result.append(m_buff_header.get(), header_size); + m_result.append(PAYLOAD_LINE); + m_result.append(s3select_syntax->get_error_description()); + ldout(s->cct, 10) << "s3-select query: failed to prase query; {" << s3select_syntax->get_error_description() << "}"<< dendl; + status = -1; + } + else { + header_size = create_header_records(m_buff_header.get()); + m_result.append(m_buff_header.get(), header_size); + m_result.append(PAYLOAD_LINE); + status = m_s3_csv_object->run_s3select_on_stream(m_result, input, input_length, s->obj_size); + if(status<0) { + m_result.append(m_s3_csv_object->get_error_description()); + } + } + + if (m_result.size() > strlen(PAYLOAD_LINE)) { + m_result.append(END_PAYLOAD_LINE); + int buff_len = create_message(m_result.data(), m_result.size() - 12, header_size); + s->formatter->write_bin_data(m_result.data(), buff_len); + if (op_ret < 0) { + return op_ret; + } + } + rgw_flush_formatter_and_reset(s, s->formatter); + + return status; +} + +int RGWSelectObj_ObjStore_S3::handle_aws_cli_parameters(std::string& sql_query) +{ + + if(chunk_number !=0) { + return 0; + } + +#define GT ">" +#define LT "<" + if (m_s3select_query.find(GT) != std::string::npos) { + boost::replace_all(m_s3select_query, GT, ">"); + } + if (m_s3select_query.find(LT) != std::string::npos) { + boost::replace_all(m_s3select_query, LT, "<"); + } + + //AWS cli s3select parameters + extract_by_tag("Expression", sql_query); + extract_by_tag("FieldDelimiter", m_column_delimiter); + extract_by_tag("QuoteCharacter", m_quot); + extract_by_tag("RecordDelimiter", m_row_delimiter); + if (m_row_delimiter.size()==0) { + m_row_delimiter='\n'; + } + + extract_by_tag("QuoteEscapeCharacter", m_escape_char); + extract_by_tag("CompressionType", m_compression_type); + if (m_compression_type.length()>0 && m_compression_type.compare("NONE") != 0) { + ldout(s->cct, 10) << "RGW supports currently only NONE option for compression type" << dendl; + return -1; + } + + extract_by_tag("FileHeaderInfo", m_header_info); + + return 0; +} + +int RGWSelectObj_ObjStore_S3::extract_by_tag(std::string tag_name, std::string& result) +{ + result = ""; + size_t _qs = m_s3select_query.find("<" + tag_name + ">", 0) + tag_name.size() + 2; + if (_qs == std::string::npos) { + return -1; + } + size_t _qe = m_s3select_query.find("", _qs); + if (_qe == std::string::npos) { + return -1; + } + + result = m_s3select_query.substr(_qs, _qe - _qs); + + return 0; +} + +int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_t len) +{ + if (len == 0) { + return 0; + } + + if (chunk_number == 0) { + if (op_ret < 0) { + set_req_state_err(s, op_ret); + } + dump_errno(s); + } + + // Explicitly use chunked transfer encoding so that we can stream the result + // to the user without having to wait for the full length of it. + if (chunk_number == 0) { + end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING); + } + + int status=0; + for(auto& it : bl.buffers()) { + status = run_s3select(m_sql_query.c_str(), &(it)[0], it.length()); + if(status<0) { + break; + } + } + + chunk_number++; + + return status; +} diff --git a/src/rgw/rgw_rest_s3.h b/src/rgw/rgw_rest_s3.h index a4f9fb41ce3a..42ea0231e1a3 100644 --- a/src/rgw/rgw_rest_s3.h +++ b/src/rgw/rgw_rest_s3.h @@ -9,6 +9,7 @@ #include #include +#include #include "common/sstring.hh" #include "rgw_op.h" @@ -728,8 +729,12 @@ protected: return s->info.args.exists("legal-hold"); } + bool is_select_op() const { + return s->info.args.exists("select-type"); + } + bool is_obj_update_op() const override { - return is_acl_op() || is_tagging_op() || is_obj_retention_op() || is_obj_legal_hold_op(); + return is_acl_op() || is_tagging_op() || is_obj_retention_op() || is_obj_legal_hold_op() || is_select_op(); } RGWOp *get_obj_op(bool get_data); @@ -875,6 +880,75 @@ static inline int valid_s3_bucket_name(const string& name, bool relaxed=false) return 0; } +namespace s3selectEngine +{ +class s3select; +class csv_object; +} + +class RGWSelectObj_ObjStore_S3 : public RGWGetObj_ObjStore_S3 +{ + +private: + std::unique_ptr s3select_syntax; + std::string m_s3select_query; + std::string m_result; + std::unique_ptr m_s3_csv_object; + std::string m_column_delimiter; + std::string m_quot; + std::string m_row_delimiter; + std::string m_compression_type; + std::string m_escape_char; + std::unique_ptr m_buff_header; + std::string m_header_info; + std::string m_sql_query; + +public: + unsigned int chunk_number; + + enum header_name_En + { + EVENT_TYPE, + CONTENT_TYPE, + MESSAGE_TYPE + }; + static const char* header_name_str[3]; + + enum header_value_En + { + RECORDS, + OCTET_STREAM, + EVENT + }; + static const char* header_value_str[3]; + + RGWSelectObj_ObjStore_S3(); + virtual ~RGWSelectObj_ObjStore_S3(); + + virtual int send_response_data(bufferlist& bl, off_t ofs, off_t len) override; + + virtual int get_params() override; + +private: + void encode_short(char* buff, uint16_t s, int& i); + + void encode_int(char* buff, u_int32_t s, int& i); + + int create_header_records(char* buff); + + std::unique_ptr crc32; + + int create_message(char* buff, u_int32_t result_len, u_int32_t header_len); + + int run_s3select(const char* query, const char* input, size_t input_length); + + int extract_by_tag(std::string tag_name, std::string& result); + + void convert_escape_seq(std::string& esc); + + int handle_aws_cli_parameters(std::string& sql_query); +}; + namespace rgw::auth::s3 { diff --git a/src/s3select b/src/s3select new file mode 160000 index 000000000000..7ae7a12c138d --- /dev/null +++ b/src/s3select @@ -0,0 +1 @@ +Subproject commit 7ae7a12c138d4607d6c012228c06f3802c493c49