]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
adding s3select sub-module; integrating sub-module into RGW; current commit is able...
authorgal salomon <gal.salomon@gmail.com>
Wed, 12 Feb 2020 04:21:54 +0000 (06:21 +0200)
committergal salomon <gal.salomon@gmail.com>
Tue, 23 Jun 2020 02:19:02 +0000 (05:19 +0300)
Signed-off-by: gal salomon <gal.salomon@gmail.com>
.gitmodules
doc/radosgw/index.rst
doc/radosgw/s3select.rst [new file with mode: 0644]
src/common/Formatter.cc
src/common/Formatter.h
src/rgw/rgw_rest.cc
src/rgw/rgw_rest_s3.cc
src/rgw/rgw_rest_s3.h
src/s3select [new submodule]

index eead9cd910f3d01bd5bf7828c51386e95c747f6e..6280d8e93eed2d2017c8165913ade7e425a8d9ea 100644 (file)
@@ -67,3 +67,6 @@
 [submodule "src/pybind/mgr/rook/rook-client-python"]
        path = src/pybind/mgr/rook/rook-client-python
        url = https://github.com/ceph/rook-client-python.git
+[submodule "s3select"]
+       path = src/s3select
+       url = https://github.com/ceph/s3select.git
index 7c85ade8cfee661880f22050de3794c2277d82fe..1d11e2a1d30538f9801462fa1914265688f43a52 100644 (file)
@@ -78,4 +78,5 @@ you may write data with one API and retrieve it with the other.
    Manpage radosgw <../../man/8/radosgw>
    Manpage radosgw-admin <../../man/8/radosgw-admin>
    QAT Acceleration for Encryption and Compression <qat-accel>
+   S3-select <s3select>
 
diff --git a/doc/radosgw/s3select.rst b/doc/radosgw/s3select.rst
new file mode 100644 (file)
index 0000000..dc6415a
--- /dev/null
@@ -0,0 +1,268 @@
+===============
+ Ceph s3 select 
+===============
+
+.. contents::
+
+Overview
+--------
+
+    | The purpose of the **s3 select** engine is to create an efficient pipe between user client and storage nodes (the engine should be close as possible to storage).
+    | It enables selection of a restricted subset of (structured) data stored in an S3 object using an SQL-like syntax.
+    | It also enables for higher level analytic-applications (such as SPARK-SQL) , using that feature to improve their latency and throughput.
+
+    | For example, a s3-object of several GB (CSV file), a user needs to extract a single column which filtered by another column.
+    | As the following query:
+    | ``select customer-id from s3Object where age>30 and age<65;``
+
+    | Currently the whole s3-object must retrieve from OSD via RGW before filtering and extracting data.
+    | By "pushing down" the query into OSD , it's possible to save a lot of network and CPU(serialization / deserialization).
+
+    | **The bigger the object, and the more accurate the query, the better the performance**.
+Basic workflow
+--------------
+    
+    | S3-select query is sent to RGW via `AWS-CLI <https://docs.aws.amazon.com/cli/latest/reference/s3api/select-object-content.html>`_
+
+    | It passes the authentication and permission process as an incoming message (POST).
+    | **RGWSelectObj_ObjStore_S3::send_response_data** is the “entry point”, it handles each fetched chunk according to input object-key.
+    | **send_response_data** is first handling the input query, it extracts the query and other CLI parameters.
+   
+    | Per each new fetched chunk (~4m), RGW executes s3-select query on it.    
+    | The current implementation supports CSV objects and since chunks are randomly “cutting” the CSV rows in the middle, those broken-lines (first or last per chunk) are skipped while processing the query.   
+    | Those “broken” lines are stored and later merged with the next broken-line (belong to the next chunk), and finally processed.
+   
+    | Per each processed chunk an output message is formatted according to `AWS specification <https://docs.aws.amazon.com/AmazonS3/latest/API/archive-RESTObjectSELECTContent.html#archive-RESTObjectSELECTContent-responses>`_ and sent back to the client.
+    | RGW supports the following response: ``{:event-type,records} {:content-type,application/octet-stream} {:message-type,event}``.
+    | For aggregation queries the last chunk should be identified as the end of input, following that the s3-select-engine initiates end-of-process and produces an aggregate result.  
+
+        
+Basic functionalities
+~~~~~~~~~~~~~~~~~~~~~
+
+    | **S3select** has a definite set of functionalities that should be implemented (if we wish to stay compliant with AWS), currently only a portion of it is implemented.
+    
+    | The implemented software architecture supports basic arithmetic expressions, logical and compare expressions, including nested function calls and casting operators, that alone enables the user reasonable flexibility. 
+    | review the bellow feature-table_.
+
+
+Error Handling
+~~~~~~~~~~~~~~
+
+    | Any error occurs while the input query processing, i.e. parsing phase or execution phase, is returned to client as response error message.
+
+    | Fatal severity (attached to the exception) will end query execution immediately, other error severity are counted, upon reaching 100, it ends query execution with an error message.
+
+
+
+
+
+Features Support
+----------------
+
+.. _feature-table:
+
+  | Currently only part of `AWS select command <https://docs.aws.amazon.com/AmazonS3/latest/dev/s3-glacier-select-sql-reference-select.html>`_ is implemented, table bellow describes what is currently supported.
+  | The following table describes the current implementation for s3-select functionalities:
+
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| Feature                         | Detailed        | Example                                                               |
++=================================+=================+=======================================================================+
+| Arithmetic operators            | ^ * / + - ( )   | select (int(_1)+int(_2))*int(_9) from stdin;                          |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+|                                 |                 | select ((1+2)*3.14) ^ 2 from stdin;                                   |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| Compare operators               | > < >= <= == != | select _1,_2 from stdin where (int(1)+int(_3))>int(_5);               |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| logical operator                | AND OR          | select count(*) from stdin where int(1)>123 and int(_5)<200;          |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| casting operator                | int(expression) | select int(_1),int( 1.2 + 3.4) from stdin;                            |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+|                                 |float(expression)| select float(1.2) from stdin;                                         |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+|                                 | timestamp(...)  | select timestamp("1999:10:10-12:23:44") from stdin;                   |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| Aggregation Function            | sum             | select sum(int(_1)) from stdin;                                       |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| Aggregation Function            | min             | select min( int(_1) * int(_5) ) from stdin;                           |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| Aggregation Function            | max             | select max(float(_1)),min(int(_5)) from stdin;                        |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| Aggregation Function            | count           | select count(*) from stdin where (int(1)+int(_3))>int(_5);            |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| Timestamp Functions             | extract         | select count(*) from stdin where                                      |
+|                                 |                 | extract("year",timestamp(_2)) > 1950                                  |    
+|                                 |                 | and extract("year",timestamp(_1)) < 1960;                             |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| Timestamp Functions             | dateadd         | select count(0) from stdin where                                      |
+|                                 |                 | datediff("year",timestamp(_1),dateadd("day",366,timestamp(_1))) == 1; |  
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| Timestamp Functions             | datediff        | select count(0) from stdin where                                      |  
+|                                 |                 | datediff("month",timestamp(_1),timestamp(_2))) == 2;                  | 
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| Timestamp Functions             | utcnow          | select count(0) from stdin where                                      |
+|                                 |                 | datediff("hours",utcnow(),dateadd("day",1,utcnow())) == 24 ;          |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| String Functions                | substr          | select count(0) from stdin where                                      |
+|                                 |                 | int(substr(_1,1,4))>1950 and int(substr(_1,1,4))<1960;                |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| alias support                   |                 |  select int(_1) as a1, int(_2) as a2 , (a1+a2) as a3                  | 
+|                                 |                 |  from stdin where a3>100 and a3<300;                                  |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+
+s3-select function interfaces
+-----------------------------
+
+Timestamp functions
+~~~~~~~~~~~~~~~~~~~
+    | The `timestamp functionalities <https://docs.aws.amazon.com/AmazonS3/latest/dev/s3-glacier-select-sql-reference-date.html>`_ is partially implemented.
+    | the casting operator( ``timestamp( string )`` ), converts string to timestamp basic type.
+    | Currently it can convert the following pattern ``yyyy:mm:dd hh:mi:dd``
+
+    | ``extract( date-part , timestamp)`` : function return integer according to date-part extract from input timestamp.
+    | supported date-part : year,month,week,day.
+
+    | ``dateadd(date-part , integer,timestamp)`` : function return timestamp, a calculation results of input timestamp and date-part.
+    | supported data-part : year,month,day.
+
+    | ``datediff(date-part,timestamp,timestamp)`` : function return an integer, a calculated result for difference between 2 timestamps according to date-part.
+    | supported date-part : year,month,day,hours.  
+
+
+    | ``utcnow()`` : return timestamp of current time.
+
+Aggregation functions
+~~~~~~~~~~~~~~~~~~~~~
+
+    | ``count()`` : return integer according to number of rows matching condition(if such exist).
+
+    | ``sum(expression)`` : return a summary of expression per all rows matching condition(if such exist).
+
+    | ``max(expression)`` : return the maximal result for all expressions matching condition(if such exist).
+
+    | ``min(expression)`` : return the minimal result for all expressions matching condition(if such exist).
+
+String functions
+~~~~~~~~~~~~~~~~
+
+    | ``substr(string,from,to)`` : return a string extract from input string according to from,to inputs.
+
+
+Alias
+~~~~~
+    | **Alias** programming-construct is an essential part of s3-select language, it enables much better programming especially with objects containing many columns or in the case of complex queries.
+    
+    | Upon parsing the statement containing alias construct, it replaces alias with reference to correct projection column, on query execution time the reference is evaluated as any other expression.
+
+    | There is a risk that self(or cyclic) reference may occur causing stack-overflow(endless-loop), for that concern upon evaluating an alias, it is validated for cyclic reference.
+    
+    | Alias also maintains result-cache, meaning upon using the same alias more than once, it’s not evaluating the same expression again(it will return the same result),instead it uses the result from cache.
+
+    | Of Course, per each new row the cache is invalidated.
+
+Sending Query to RGW
+--------------------
+
+   | Any http-client can send s3-select request to RGW, it must be compliant with `AWS Request syntax <https://docs.aws.amazon.com/AmazonS3/latest/API/API_SelectObjectContent.html#API_SelectObjectContent_RequestSyntax>`_.
+
+
+
+   | Sending s3-select request to RGW using AWS cli, should follow `AWS command reference <https://docs.aws.amazon.com/cli/latest/reference/s3api/select-object-content.html>`_.
+   | bellow is an example for it.
+
+::
+
+ aws --endpoint-url http://localhost:8000 s3api select-object-content 
+  --bucket {BUCKET-NAME}  
+  --expression-type 'SQL'     
+  --input-serialization 
+  '{"CSV": {"FieldDelimiter": "," , "QuoteCharacter": "\"" , "RecordDelimiter" : "\n" , "QuoteEscapeCharacter" : "\\" , "FileHeaderInfo": "USE" }, "CompressionType": "NONE"}' 
+  --output-serialization '{"CSV": {}}' 
+  --key {OBJECT-NAME} 
+  --expression "select count(0) from stdin where int(_1)<10;" output.csv
+
+Syntax
+~~~~~~
+
+    | **Input serialization** (Implemented), it let the user define the CSV definitions; the default values are {\\n} for row-delimiter {,} for field delimiter, {"} for quote, {\\} for escape characters.
+    | it handle the **csv-header-info**, the first row in input object containing the schema.
+    | **Output serialization** is currently not implemented, the same for **compression-type**.
+
+    | s3-select engine contain a CSV parser, which parse s3-objects as follows.   
+    | - each row ends with row-delimiter.
+    | - field-separator separates between adjacent columns, successive field separator define NULL column.
+    | - quote-character overrides field separator, meaning , field separator become as any character between quotes.
+    | - escape character disables any special characters, except for row delimiter.
+    
+    | Below are examples for CSV parsing rules.
+
+
+CSV parsing behavior
+--------------------
+
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+| Feature                         | Description     | input ==> tokens                                                      |
++=================================+=================+=======================================================================+
+|     NULL                        | successive      | ,,1,,2,    ==> {null}{null}{1}{null}{2}{null}                         |
+|                                 | field delimiter |                                                                       |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+|     QUOTE                       | quote character | 11,22,"a,b,c,d",last ==> {11}{22}{"a,b,c,d"}{last}                    |
+|                                 | overrides       |                                                                       |
+|                                 | field delimiter |                                                                       |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+|     Escape                      | escape char     | 11,22,str=\\"abcd\\"\\,str2=\\"123\\",last                            |
+|                                 | overrides       | ==> {11}{22}{str="abcd",str2="123"}{last}                             |
+|                                 | meta-character. |                                                                       |
+|                                 | escape removed  |                                                                       |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+|     row delimiter               | no close quote, | 11,22,a="str,44,55,66                                                 |
+|                                 | row delimiter is| ==> {11}{22}{a="str,44,55,66}                                         |
+|                                 | closing line    |                                                                       |
++---------------------------------+-----------------+-----------------------------------------------------------------------+
+|     csv header info             | FileHeaderInfo  | "**USE**" value means each token on first line is column-name,        |
+|                                 | tag             | "**IGNORE**" value means to skip the first line                       |
++---------------------------------+-----------------+-----------------------------------------------------------------------+       
+
+
+BOTO3
+-----
+
+ | using BOTO3 is "natural" and easy due to AWS-cli support. 
+
+::
+
+
+ def run_s3select(bucket,key,query,column_delim=",",row_delim="\n",quot_char='"',esc_char='\\',csv_header_info="NONE"):
+    s3 = boto3.client('s3',
+        endpoint_url=endpoint,
+        aws_access_key_id=access_key,
+        region_name=region_name,
+        aws_secret_access_key=secret_key)
+        
+
+
+    r = s3.select_object_content(
+        Bucket=bucket,
+        Key=key,
+        ExpressionType='SQL',
+        InputSerialization = {"CSV": {"RecordDelimiter" : row_delim, "FieldDelimiter" : column_delim,"QuoteEscapeCharacter": esc_char, "QuoteCharacter": quot_char, "FileHeaderInfo": csv_header_info}, "CompressionType": "NONE"},
+        OutputSerialization = {"CSV": {}},
+        Expression=query,)
+
+    result = ""
+    for event in r['Payload']:
+        if 'Records' in event:
+            records = event['Records']['Payload'].decode('utf-8')
+            result += records
+
+    return result
+
+
+
+
+  run_s3select(
+  "my_bucket",
+  "my_csv_object",
+  "select int(_1) as a1, int(_2) as a2 , (a1+a2) as a3 from stdin where a3>100 and a3<300;")
+
index c66fb0cfe622658dc0b40481f1bee472dc47c7ad..5d268aab742753ae364201739cc20ce0438332e2 100644 (file)
@@ -75,6 +75,8 @@ FormatterAttrs::FormatterAttrs(const char *attr, ...)
   va_end(ap);
 }
 
+void Formatter::write_bin_data(const char*, int){}
+
 Formatter::Formatter() { }
 
 Formatter::~Formatter() { }
@@ -544,6 +546,13 @@ void XMLFormatter::write_raw_data(const char *data)
   m_ss << data;
 }
 
+void XMLFormatter::write_bin_data(const char* buff, int buf_len)
+{
+  std::stringbuf *pbuf = m_ss.rdbuf();
+  pbuf->sputn(buff, buf_len);
+  m_ss.seekg(buf_len);
+}
+
 void XMLFormatter::get_attrs_str(const FormatterAttrs *attrs, std::string& attrs_str)
 {
   std::stringstream attrs_ss;
index 228e22cfa1e890a184fa0a97428cf1baf4a8b384..e57ede878d9e5279943b0635203739a1870d5157 100644 (file)
@@ -118,6 +118,7 @@ namespace ceph {
     virtual void *get_external_feature_handler(const std::string& feature) {
       return nullptr;
     }
+    virtual void write_bin_data(const char* buff, int buf_len);
   };
 
   class copyable_sstream : public std::stringstream {
@@ -226,6 +227,7 @@ namespace ceph {
     void dump_format_va(std::string_view name, const char *ns, bool quoted, const char *fmt, va_list ap) override;
     int get_len() const override;
     void write_raw_data(const char *data) override;
+    void write_bin_data(const char* buff, int len) override;
 
     /* with attrs */
     void open_array_section_with_attrs(std::string_view name, const FormatterAttrs& attrs) override;
index 04571049107514081da70c52bee1d7adc8b46b93..c72cf9cf24cd9237c495f640d3968e50bcca2b5b 100644 (file)
@@ -1891,6 +1891,7 @@ int RGWHandler_REST::read_permissions(RGWOp* op_obj)
     /* is it a 'create bucket' request? */
     if (op_obj->get_type() == RGW_OP_CREATE_BUCKET)
       return 0;
+    
     only_bucket = true;
     break;
   case OP_DELETE:
index e1fe39f99e552a0030062523957933e92f3ee6ec..75ba9f075187d2972e4f6a0a3e32da8e8593e3cb 100644 (file)
@@ -17,6 +17,7 @@
 #include <boost/algorithm/string/replace.hpp>
 #include <boost/utility/string_view.hpp>
 #include <boost/tokenizer.hpp>
+#include <s3select/include/s3select.h>
 
 #include <liboath/oath.h>
 
@@ -4585,6 +4586,9 @@ RGWOp *RGWHandler_REST_Obj_S3::op_post()
 
   if (s->info.args.exists("uploads"))
     return new RGWInitMultipart_ObjStore_S3;
+  
+  if (is_select_op())
+    return new RGWSelectObj_ObjStore_S3;
 
   return new RGWPostObj_ObjStore_S3;
 }
@@ -5354,6 +5358,7 @@ AWSGeneralAbstractor::get_auth_data_v4(const req_state* const s,
         case RGW_OP_PUT_BUCKET_PUBLIC_ACCESS_BLOCK:
         case RGW_OP_GET_BUCKET_PUBLIC_ACCESS_BLOCK:
         case RGW_OP_DELETE_BUCKET_PUBLIC_ACCESS_BLOCK:
+       case RGW_OP_GET_OBJ://s3select its post-method(payload contain the query) , the request is get-object
           break;
         default:
           dout(10) << "ERROR: AWS4 completion for this operation NOT IMPLEMENTED" << dendl;
@@ -5943,3 +5948,291 @@ bool rgw::auth::s3::S3AnonymousEngine::is_applicable(
 
   return route == AwsRoute::QUERY_STRING && version == AwsVersion::UNKNOWN;
 }
+
+
+using namespace s3selectEngine;
+const char* RGWSelectObj_ObjStore_S3::header_name_str[3] = {":event-type", ":content-type", ":message-type"};
+const char* RGWSelectObj_ObjStore_S3::header_value_str[3] = {"Records", "application/octet-stream", "event"};
+
+RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3():
+  s3select_syntax(std::make_unique<s3selectEngine::s3select>()),
+  m_s3_csv_object(std::unique_ptr<s3selectEngine::csv_object>()),
+  m_buff_header(std::make_unique<char[]>(1000)),
+  chunk_number(0),
+  crc32(std::unique_ptr<boost::crc_32_type>())
+{
+  set_get_data(true);
+}
+
+RGWSelectObj_ObjStore_S3::~RGWSelectObj_ObjStore_S3()
+{
+}
+
+int RGWSelectObj_ObjStore_S3::get_params()
+{
+
+  //retrieve s3-select query from payload
+  bufferlist data;
+  int ret;
+  int max_size = 4096;
+  std::tie(ret, data) = rgw_rest_read_all_input(s, max_size, false);
+  if (ret != 0) {
+    ldout(s->cct, 10) << "s3-select query: failed to retrieve query; ret = " << ret << dendl;
+    return ret;
+  }
+
+  m_s3select_query = data.to_str();
+  if (m_s3select_query.length() > 0) {
+    ldout(s->cct, 10) << "s3-select query: " << m_s3select_query << dendl;
+  }
+  else {
+    ldout(s->cct, 10) << "s3-select query: failed to retrieve query;" << dendl;
+    return -1;
+  }
+
+  int status = handle_aws_cli_parameters(m_sql_query);
+
+  if (status<0) {
+    return status;
+  }
+
+  return RGWGetObj_ObjStore_S3::get_params();
+}
+
+void RGWSelectObj_ObjStore_S3::encode_short(char* buff, uint16_t s, int& i)
+{
+  short x = htons(s);
+  memcpy(buff, &x, sizeof(s));
+  i+=sizeof(s);
+}
+
+void RGWSelectObj_ObjStore_S3::encode_int(char* buff, u_int32_t s, int& i)
+{
+  u_int32_t x = htonl(s);
+  memcpy(buff, &x, sizeof(s));
+  i+=sizeof(s);
+}
+
+int RGWSelectObj_ObjStore_S3::create_header_records(char* buff)
+{
+  int i = 0;
+
+  //1
+  buff[i++] = char(strlen(header_name_str[EVENT_TYPE]));
+  memcpy(&buff[i], header_name_str[EVENT_TYPE], strlen(header_name_str[EVENT_TYPE]));
+  i += strlen(header_name_str[EVENT_TYPE]);
+  buff[i++] = char(7);
+  encode_short(&buff[i], uint16_t(strlen(header_value_str[RECORDS])), i);
+  memcpy(&buff[i], header_value_str[RECORDS], strlen(header_value_str[RECORDS]));
+  i += strlen(header_value_str[RECORDS]);
+
+  //2
+  buff[i++] = char(strlen(header_name_str[CONTENT_TYPE]));
+  memcpy(&buff[i], header_name_str[CONTENT_TYPE], strlen(header_name_str[CONTENT_TYPE]));
+  i += strlen(header_name_str[CONTENT_TYPE]);
+  buff[i++] = char(7);
+  encode_short(&buff[i], uint16_t(strlen(header_value_str[OCTET_STREAM])), i);
+  memcpy(&buff[i], header_value_str[OCTET_STREAM], strlen(header_value_str[OCTET_STREAM]));
+  i += strlen(header_value_str[OCTET_STREAM]);
+
+  //3
+  buff[i++] = char(strlen(header_name_str[MESSAGE_TYPE]));
+  memcpy(&buff[i], header_name_str[MESSAGE_TYPE], strlen(header_name_str[MESSAGE_TYPE]));
+  i += strlen(header_name_str[MESSAGE_TYPE]);
+  buff[i++] = char(7);
+  encode_short(&buff[i], uint16_t(strlen(header_value_str[EVENT])), i);
+  memcpy(&buff[i], header_value_str[EVENT], strlen(header_value_str[EVENT]));
+  i += strlen(header_value_str[EVENT]);
+
+  return i;
+}
+
+int RGWSelectObj_ObjStore_S3::create_message(char* buff, u_int32_t result_len, u_int32_t header_len)
+{
+  u_int32_t total_byte_len = 0;
+  u_int32_t preload_crc = 0;
+  u_int32_t message_crc = 0;
+  int i = 0;
+
+  if(crc32 ==0) {
+    // the parameters are according to CRC-32 algorithm and its aligned with AWS-cli checksum
+    crc32 = std::unique_ptr<boost::crc_32_type>(new boost::crc_optimal<32, 0x04C11DB7, 0xFFFFFFFF, 0xFFFFFFFF, true, true>);
+  }
+
+  total_byte_len = result_len + 16;
+
+  encode_int(&buff[i], total_byte_len, i);
+  encode_int(&buff[i], header_len, i);
+
+  crc32->reset();
+  *crc32 = std::for_each( buff, buff + 8, *crc32 );
+  preload_crc = (*crc32)();
+  encode_int(&buff[i], preload_crc, i);
+
+  i += result_len;
+
+  crc32->reset();
+  *crc32 = std::for_each( buff, buff + i, *crc32 );
+  message_crc = (*crc32)();
+  encode_int(&buff[i], message_crc, i);
+
+  return i;
+}
+
+#define PAYLOAD_LINE "\n<Payload>\n<Records>\n<Payload>\n"
+#define END_PAYLOAD_LINE "\n</Payload></Records></Payload>"
+
+int RGWSelectObj_ObjStore_S3::run_s3select(const char* query, const char* input, size_t input_length)
+{
+  int status = 0;
+  csv_object::csv_defintions csv;
+
+  m_result = "012345678901"; //12 positions for header-crc
+
+  int header_size = 0;
+
+  if (m_s3_csv_object==0) {
+    s3select_syntax->parse_query(query);
+
+    if (m_row_delimiter.size()) {
+      csv.row_delimiter = *m_row_delimiter.c_str();
+    }
+
+    if (m_column_delimiter.size()) {
+      csv.column_delimiter = *m_column_delimiter.c_str();
+    }
+
+    if (m_quot.size()) {
+      csv.quot_char = *m_quot.c_str();
+    }
+
+    if (m_escape_char.size()) {
+      csv.escape_char = *m_escape_char.c_str();
+    }
+
+    if(m_header_info.compare("IGNORE")==0) {
+      csv.ignore_header_info=true;
+    }
+    else if(m_header_info.compare("USE")==0) {
+      csv.use_header_info=true;
+    }
+
+    m_s3_csv_object = std::unique_ptr<s3selectEngine::csv_object>(new s3selectEngine::csv_object(s3select_syntax.get(), csv));
+  }
+
+  if (s3select_syntax->get_error_description().empty() == false) {
+    header_size = create_header_records(m_buff_header.get());
+    m_result.append(m_buff_header.get(), header_size);
+    m_result.append(PAYLOAD_LINE);
+    m_result.append(s3select_syntax->get_error_description());
+    ldout(s->cct, 10) << "s3-select query: failed to prase query; {" << s3select_syntax->get_error_description() << "}"<< dendl;
+    status = -1;
+  }
+  else {
+    header_size = create_header_records(m_buff_header.get());
+    m_result.append(m_buff_header.get(), header_size);
+    m_result.append(PAYLOAD_LINE);
+    status = m_s3_csv_object->run_s3select_on_stream(m_result, input, input_length, s->obj_size);
+    if(status<0) {
+      m_result.append(m_s3_csv_object->get_error_description());
+    }
+  }
+
+  if (m_result.size() > strlen(PAYLOAD_LINE)) {
+    m_result.append(END_PAYLOAD_LINE);
+    int buff_len = create_message(m_result.data(), m_result.size() - 12, header_size);
+    s->formatter->write_bin_data(m_result.data(), buff_len);
+    if (op_ret < 0) {
+      return op_ret;
+    }
+  }
+  rgw_flush_formatter_and_reset(s, s->formatter);
+
+  return status;
+}
+
+int RGWSelectObj_ObjStore_S3::handle_aws_cli_parameters(std::string& sql_query)
+{
+
+  if(chunk_number !=0) {
+    return 0;
+  }
+
+#define GT "&gt;"
+#define LT "&lt;"
+  if (m_s3select_query.find(GT) != std::string::npos) {
+    boost::replace_all(m_s3select_query, GT, ">");
+  }
+  if (m_s3select_query.find(LT) != std::string::npos) {
+    boost::replace_all(m_s3select_query, LT, "<");
+  }
+
+  //AWS cli s3select parameters
+  extract_by_tag("Expression", sql_query);
+  extract_by_tag("FieldDelimiter", m_column_delimiter);
+  extract_by_tag("QuoteCharacter", m_quot);
+  extract_by_tag("RecordDelimiter", m_row_delimiter);
+  if (m_row_delimiter.size()==0) {
+    m_row_delimiter='\n';
+  }
+
+  extract_by_tag("QuoteEscapeCharacter", m_escape_char);
+  extract_by_tag("CompressionType", m_compression_type);
+  if (m_compression_type.length()>0 && m_compression_type.compare("NONE") != 0) {
+    ldout(s->cct, 10) << "RGW supports currently only NONE option for compression type" << dendl;
+    return -1;
+  }
+
+  extract_by_tag("FileHeaderInfo", m_header_info);
+
+  return 0;
+}
+
+int RGWSelectObj_ObjStore_S3::extract_by_tag(std::string tag_name, std::string& result)
+{
+  result = "";
+  size_t _qs = m_s3select_query.find("<" + tag_name + ">", 0) + tag_name.size() + 2;
+  if (_qs == std::string::npos) {
+    return -1;
+  }
+  size_t _qe = m_s3select_query.find("</" + tag_name + ">", _qs);
+  if (_qe == std::string::npos) {
+    return -1;
+  }
+
+  result = m_s3select_query.substr(_qs, _qe - _qs);
+
+  return 0;
+}
+
+int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_t len)
+{
+  if (len == 0) {
+    return 0;
+  }
+
+  if (chunk_number == 0) {
+    if (op_ret < 0) {
+      set_req_state_err(s, op_ret);
+    }
+    dump_errno(s);
+  }
+
+  // Explicitly use chunked transfer encoding so that we can stream the result
+  // to the user without having to wait for the full length of it.
+  if (chunk_number == 0) {
+    end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING);
+  }
+
+  int status=0;
+  for(auto& it : bl.buffers()) {
+    status = run_s3select(m_sql_query.c_str(), &(it)[0], it.length());
+    if(status<0) {
+      break;
+    }
+  }
+
+  chunk_number++;
+
+  return status;
+}
index a4f9fb41ce3abe00efaeee2f5ce78836d956ef72..42ea0231e1a308d71fb1c7b374f00cdb6f5c2254 100644 (file)
@@ -9,6 +9,7 @@
 
 #include <boost/utility/string_view.hpp>
 #include <boost/container/static_vector.hpp>
+#include <boost/crc.hpp> 
 
 #include "common/sstring.hh"
 #include "rgw_op.h"
@@ -728,8 +729,12 @@ protected:
     return s->info.args.exists("legal-hold");
   }
 
+  bool is_select_op() const {
+    return s->info.args.exists("select-type");
+  }
+
   bool is_obj_update_op() const override {
-    return is_acl_op() || is_tagging_op() || is_obj_retention_op() || is_obj_legal_hold_op();
+    return is_acl_op() || is_tagging_op() || is_obj_retention_op() || is_obj_legal_hold_op() || is_select_op();
   }
   RGWOp *get_obj_op(bool get_data);
 
@@ -875,6 +880,75 @@ static inline int valid_s3_bucket_name(const string& name, bool relaxed=false)
   return 0;
 }
 
+namespace s3selectEngine
+{
+class s3select;
+class csv_object;
+}
+
+class RGWSelectObj_ObjStore_S3 : public RGWGetObj_ObjStore_S3
+{
+
+private:
+  std::unique_ptr<s3selectEngine::s3select> s3select_syntax;
+  std::string m_s3select_query;
+  std::string m_result;
+  std::unique_ptr<s3selectEngine::csv_object> m_s3_csv_object;
+  std::string m_column_delimiter;
+  std::string m_quot;
+  std::string m_row_delimiter;
+  std::string m_compression_type;
+  std::string m_escape_char;
+  std::unique_ptr<char[]>  m_buff_header;
+  std::string m_header_info;
+  std::string m_sql_query;
+
+public:
+  unsigned int chunk_number;
+
+  enum header_name_En
+  {
+    EVENT_TYPE,
+    CONTENT_TYPE,
+    MESSAGE_TYPE
+  };
+  static const char* header_name_str[3];
+
+  enum header_value_En
+  {
+    RECORDS,
+    OCTET_STREAM,
+    EVENT
+  };
+  static const char* header_value_str[3];
+
+  RGWSelectObj_ObjStore_S3();
+  virtual ~RGWSelectObj_ObjStore_S3();
+
+  virtual int send_response_data(bufferlist& bl, off_t ofs, off_t len) override;
+
+  virtual int get_params() override;
+
+private:
+  void encode_short(char* buff, uint16_t s, int& i);
+
+  void encode_int(char* buff, u_int32_t s, int& i);
+
+  int create_header_records(char* buff);
+
+  std::unique_ptr<boost::crc_32_type> crc32;
+
+  int create_message(char* buff, u_int32_t result_len, u_int32_t header_len);
+
+  int run_s3select(const char* query, const char* input, size_t input_length);
+
+  int extract_by_tag(std::string tag_name, std::string& result);
+
+  void convert_escape_seq(std::string& esc);
+
+  int handle_aws_cli_parameters(std::string& sql_query);
+};
+
 
 namespace rgw::auth::s3 {
 
diff --git a/src/s3select b/src/s3select
new file mode 160000 (submodule)
index 0000000..7ae7a12
--- /dev/null
@@ -0,0 +1 @@
+Subproject commit 7ae7a12c138d4607d6c012228c06f3802c493c49