]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
RGW: Implement continuation, progress, stats, end s3select response
authorgal salomon <gal.salomon@gmail.com>
Fri, 7 May 2021 21:29:13 +0000 (00:29 +0300)
committergalsalomon66 <gal.salomon@gmail.com>
Sat, 18 Dec 2021 11:51:54 +0000 (13:51 +0200)
    RGW/S3select: Implement output-serializationi. user may request different CSV defintions
    for output (field delimiter, row delimiter, quote handling.

    RGW/S3select: Implement presto-alignments. presto-application sends
    queries with table-alias,case insensitive, and with no-semicolon at the
    end of statement.

    RGW/s3select: zero object size issue

Signed-off-by: Albin Antony <aantony@redhat.com>
Signed-off-by: galsalomon66 <gal.salomon@gmail.com>
qa/suites/rgw/verify/tasks/s3tests.yaml
src/rgw/rgw_rest_s3.cc
src/rgw/rgw_rest_s3.h
src/s3select

index b5aef9b044b83cdb9b28cd40f31b989167d0d069..b9934fea37bbfef5e07a6f9d0de13bfce32e6a60 100644 (file)
@@ -1,5 +1,6 @@
 tasks:
 - s3tests:
     client.0:
-      force-branch: ceph-master
+      force-branch: progress-stats
+      git_remote: https://github.com/galsalomon66/
       rgw_server: client.0
index 4630652487393977611582c612c3438148edce48..426a23324bc21c26d33a3f6b03be89f6ebcdd8d8 100644 (file)
@@ -6128,156 +6128,327 @@ bool rgw::auth::s3::S3AnonymousEngine::is_applicable(
 
 
 using namespace s3selectEngine;
-const char* RGWSelectObj_ObjStore_S3::header_name_str[3] = {":event-type", ":content-type", ":message-type"};
-const char* RGWSelectObj_ObjStore_S3::header_value_str[3] = {"Records", "application/octet-stream", "event"};
 
-RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3():
-  s3select_syntax(std::make_unique<s3selectEngine::s3select>()),
-  m_s3_csv_object(std::unique_ptr<s3selectEngine::csv_object>()),
-  m_buff_header(std::make_unique<char[]>(1000)),
-  chunk_number(0),
-  crc32(std::unique_ptr<boost::crc_32_type>())
+std::string &aws_response_handler::get_sql_result()
 {
-  set_get_data(true);
+  return sql_result;
 }
 
-RGWSelectObj_ObjStore_S3::~RGWSelectObj_ObjStore_S3()
+uint64_t aws_response_handler::get_processed_size()
 {
+  return processed_size;
 }
 
-int RGWSelectObj_ObjStore_S3::get_params(optional_yield y)
+void aws_response_handler::set_processed_size(uint64_t value)
 {
+  processed_size += value;
+}
 
-  //retrieve s3-select query from payload
-  bufferlist data;
-  int ret;
-  int max_size = 4096;
-  std::tie(ret, data) = read_all_input(s, max_size, false);
-  if (ret != 0) {
-    ldpp_dout(this, 10) << "s3-select query: failed to retrieve query; ret = " << ret << dendl;
-    return ret;
-  }
+uint64_t aws_response_handler::get_total_bytes_returned()
+{
+  return total_bytes_returned;
+}
 
-  m_s3select_query = data.to_str();
-  if (m_s3select_query.length() > 0) {
-    ldpp_dout(this, 10) << "s3-select query: " << m_s3select_query << dendl;
-  }
-  else {
-    ldpp_dout(this, 10) << "s3-select query: failed to retrieve query;" << dendl;
-    return -1;
-  }
+void aws_response_handler::set_total_bytes_returned(uint64_t value)
+{
+  total_bytes_returned += value;
+}
 
-  int status = handle_aws_cli_parameters(m_sql_query);
+void aws_response_handler::push_header(const char *header_name, const char *header_value)
+{
+  char x;
+  short s;
 
-  if (status<0) {
-    return status;
-  }
+  x = char(strlen(header_name));
+  m_buff_header.append(&x, sizeof(x));
+  m_buff_header.append(header_name);
 
-  return RGWGetObj_ObjStore_S3::get_params(y);
+  x = char(7);
+  m_buff_header.append(&x, sizeof(x));
+
+  s = htons(uint16_t(strlen(header_value)));
+  m_buff_header.append(reinterpret_cast<char *>(&s), sizeof(s));
+  m_buff_header.append(header_value);
 }
 
-void RGWSelectObj_ObjStore_S3::encode_short(char* buff, uint16_t s, int& i)
+int aws_response_handler::create_header_records()
 {
-  short x = htons(s);
-  memcpy(buff, &x, sizeof(s));
-  i+=sizeof(s);
+  //headers description(AWS)
+  //[header-name-byte-length:1][header-name:variable-length][header-value-type:1][header-value:variable-length]
+
+  //1
+  push_header(header_name_str[EVENT_TYPE], header_value_str[RECORDS]);
+  //2
+  push_header(header_name_str[CONTENT_TYPE], header_value_str[OCTET_STREAM]);
+  //3
+  push_header(header_name_str[MESSAGE_TYPE], header_value_str[EVENT]);
+
+  return m_buff_header.size();
 }
 
-void RGWSelectObj_ObjStore_S3::encode_int(char* buff, u_int32_t s, int& i)
+int aws_response_handler::create_header_continuation()
 {
-  u_int32_t x = htonl(s);
-  memcpy(buff, &x, sizeof(s));
-  i+=sizeof(s);
+  //headers description(AWS)
+  //1
+  push_header(header_name_str[EVENT_TYPE], header_value_str[CONT]);
+  //2
+  push_header(header_name_str[MESSAGE_TYPE], header_value_str[EVENT]);
+
+  return m_buff_header.size();
 }
 
-int RGWSelectObj_ObjStore_S3::create_header_records(char* buff)
+int aws_response_handler::create_header_progress()
 {
-  int i = 0;
+  //headers description(AWS)
+  //1
+  push_header(header_name_str[EVENT_TYPE], header_value_str[PROGRESS]);
+  //2
+  push_header(header_name_str[CONTENT_TYPE], header_value_str[XML]);
+  //3
+  push_header(header_name_str[MESSAGE_TYPE], header_value_str[EVENT]);
+
+  return m_buff_header.size();
+}
 
+int aws_response_handler::create_header_stats()
+{
   //headers description(AWS)
-  //[header-name-byte-length:1][header-name:variable-length][header-value-type:1][header-value:variable-length]
-  
   //1
-  buff[i++] = char(strlen(header_name_str[EVENT_TYPE]));
-  memcpy(&buff[i], header_name_str[EVENT_TYPE], strlen(header_name_str[EVENT_TYPE]));
-  i += strlen(header_name_str[EVENT_TYPE]);
-  buff[i++] = char(7);
-  encode_short(&buff[i], uint16_t(strlen(header_value_str[RECORDS])), i);
-  memcpy(&buff[i], header_value_str[RECORDS], strlen(header_value_str[RECORDS]));
-  i += strlen(header_value_str[RECORDS]);
+  push_header(header_name_str[EVENT_TYPE], header_value_str[STATS]);
+  //2
+  push_header(header_name_str[CONTENT_TYPE], header_value_str[XML]);
+  //3
+  push_header(header_name_str[MESSAGE_TYPE], header_value_str[EVENT]);
+
+  return m_buff_header.size();
+}
 
+int aws_response_handler::create_header_end()
+{
+  //headers description(AWS)
+  //1
+  push_header(header_name_str[EVENT_TYPE], header_value_str[END]);
   //2
-  buff[i++] = char(strlen(header_name_str[CONTENT_TYPE]));
-  memcpy(&buff[i], header_name_str[CONTENT_TYPE], strlen(header_name_str[CONTENT_TYPE]));
-  i += strlen(header_name_str[CONTENT_TYPE]);
-  buff[i++] = char(7);
-  encode_short(&buff[i], uint16_t(strlen(header_value_str[OCTET_STREAM])), i);
-  memcpy(&buff[i], header_value_str[OCTET_STREAM], strlen(header_value_str[OCTET_STREAM]));
-  i += strlen(header_value_str[OCTET_STREAM]);
+  push_header(header_name_str[MESSAGE_TYPE], header_value_str[EVENT]);
+
+  return m_buff_header.size();
+}
+
+int aws_response_handler::create_error_header_records(const char *error_message)
+{
+  //headers description(AWS)
+  //[header-name-byte-length:1][header-name:variable-length][header-value-type:1][header-value:variable-length]
 
+  //1
+  push_header(header_name_str[ERROR_CODE], header_value_str[ENGINE_ERROR]);
+  //2
+  push_header(header_name_str[ERROR_MESSAGE], error_message);
   //3
-  buff[i++] = char(strlen(header_name_str[MESSAGE_TYPE]));
-  memcpy(&buff[i], header_name_str[MESSAGE_TYPE], strlen(header_name_str[MESSAGE_TYPE]));
-  i += strlen(header_name_str[MESSAGE_TYPE]);
-  buff[i++] = char(7);
-  encode_short(&buff[i], uint16_t(strlen(header_value_str[EVENT])), i);
-  memcpy(&buff[i], header_value_str[EVENT], strlen(header_value_str[EVENT]));
-  i += strlen(header_value_str[EVENT]);
+  push_header(header_name_str[MESSAGE_TYPE], header_value_str[ERROR_TYPE]);
 
-  return i;
+  return m_buff_header.size();
 }
 
-int RGWSelectObj_ObjStore_S3::create_message(std::string &out_string, u_int32_t result_len, u_int32_t header_len)
+int aws_response_handler::create_message(u_int32_t header_len)
 {
-  //message description(AWS): 
+  //message description(AWS):
   //[total-byte-length:4][header-byte-length:4][crc:4][headers:variable-length][payload:variable-length][crc:4]
-  //s3select result is produced into m_result, the m_result is also the response-message, thus the attach headers and CRC 
+  //s3select result is produced into sql_result, the sql_result is also the response-message, thus the attach headers and CRC
   //are created later to the produced SQL result, and actually wrapping the payload.
 
+  auto push_encode_int = [&](u_int32_t s, int pos) {
+    u_int32_t x = htonl(s);
+    sql_result.replace(pos, sizeof(x), reinterpret_cast<char *>(&x), sizeof(x));
+  };
+
   u_int32_t total_byte_len = 0;
   u_int32_t preload_crc = 0;
   u_int32_t message_crc = 0;
-  int i = 0;
-  char * buff = out_string.data();
 
-  if(crc32 ==0) {
-    // the parameters are according to CRC-32 algorithm and its aligned with AWS-cli checksum
-    crc32 = std::unique_ptr<boost::crc_32_type>(new boost::crc_optimal<32, 0x04C11DB7, 0xFFFFFFFF, 0xFFFFFFFF, true, true>);
-  }
+  total_byte_len = sql_result.size() + 4; //the total is greater in 4 bytes than current size
 
-  total_byte_len = result_len + 16;//the total is greater in 4 bytes than current size
+  push_encode_int(total_byte_len, 0);
+  push_encode_int(header_len, 4);
 
-  encode_int(&buff[i], total_byte_len, i);//store sizes at the beginning of the buffer
-  encode_int(&buff[i], header_len, i);
+  crc32.reset();
+  crc32 = std::for_each(sql_result.data(), sql_result.data() + 8, crc32); //crc for starting 8 bytes
+  preload_crc = crc32();
+  push_encode_int(preload_crc, 8);
 
-  crc32->reset();
-  *crc32 = std::for_each( buff, buff + 8, *crc32 );//crc for starting 8 bytes
-  preload_crc = (*crc32)();
-  encode_int(&buff[i], preload_crc, i);
+  crc32.reset();
+  crc32 = std::for_each(sql_result.begin(), sql_result.end(), crc32); //crc for payload + checksum
+  message_crc = crc32();
 
-  i += result_len;//advance to the end of payload.
+  u_int32_t x = htonl(message_crc);
+  sql_result.append(reinterpret_cast<char *>(&x), sizeof(x));
 
-  crc32->reset();
-  *crc32 = std::for_each( buff, buff + i, *crc32 );//crc for payload + checksum
-  message_crc = (*crc32)();
-  char out_encode[4];
-  encode_int(out_encode, message_crc, i);
-  out_string.append(out_encode,sizeof(out_encode));
+  return sql_result.size();
+}
+
+void aws_response_handler::init_response()
+{ //12 positions for header-crc
+  sql_result.resize(header_crc_size,'\0');
+}
+
+void aws_response_handler::init_success_response()
+{
+  m_buff_header.clear();
+  header_size = create_header_records();
+  sql_result.append(m_buff_header.c_str(), header_size);
+  sql_result.append(PAYLOAD_LINE);
+}
+
+void aws_response_handler::send_continuation_response()
+{
+  sql_result.resize(header_crc_size,'\0');
+  m_buff_header.clear();
+  header_size = create_header_continuation();
+  sql_result.append(m_buff_header.c_str(), header_size);
+  int buff_len = create_message(header_size);
+  s->formatter->write_bin_data(sql_result.data(), buff_len);
+  rgw_flush_formatter_and_reset(s, s->formatter);
+}
 
-  return i;
+void aws_response_handler::init_progress_response()
+{
+  sql_result.resize(header_crc_size,'\0');
+  m_buff_header.clear();
+  header_size = create_header_progress();
+  sql_result.append(m_buff_header.c_str(), header_size);
 }
 
-#define PAYLOAD_LINE "\n<Payload>\n<Records>\n<Payload>\n"
-#define END_PAYLOAD_LINE "\n</Payload></Records></Payload>"
+void aws_response_handler::init_stats_response()
+{
+  sql_result.resize(header_crc_size,'\0');
+  m_buff_header.clear();
+  header_size = create_header_stats();
+  sql_result.append(m_buff_header.c_str(), header_size);
+}
+
+void aws_response_handler::init_end_response()
+{
+  sql_result.resize(header_crc_size,'\0');
+  m_buff_header.clear();
+  header_size = create_header_end();
+  sql_result.append(m_buff_header.c_str(), header_size);
+  int buff_len = create_message(header_size);
+  s->formatter->write_bin_data(sql_result.data(), buff_len);
+  rgw_flush_formatter_and_reset(s, s->formatter);
+}
+
+void aws_response_handler::init_error_response(const char *error_message)
+{ //currently not in use. the headers in the case of error, are not extracted by AWS-cli.
+  m_buff_header.clear();
+  header_size = create_error_header_records(error_message);
+  sql_result.append(m_buff_header.c_str(), header_size);
+}
+
+void aws_response_handler::send_success_response()
+{
+  sql_result.append(END_PAYLOAD_LINE);
+  int buff_len = create_message(header_size);
+  s->formatter->write_bin_data(sql_result.data(), buff_len);
+  rgw_flush_formatter_and_reset(s, s->formatter);
+}
+
+void aws_response_handler::send_error_response(const char *error_code,
+                                               const char *error_message,
+                                               const char *resource_id)
+{
+
+  set_req_state_err(s, 0); //TODO what err_no?
+  dump_errno(s, 400);
+  end_header(s, m_rgwop, "application/xml", CHUNKED_TRANSFER_ENCODING);
+  dump_start(s);
+
+  s->formatter->open_object_section("Error");
+
+  s->formatter->dump_string("Code", error_code);
+  s->formatter->dump_string("Message", error_message);
+  s->formatter->dump_string("Resource", "#Resource#");
+  s->formatter->dump_string("RequestId", resource_id);
+
+  s->formatter->close_section();
+
+  rgw_flush_formatter_and_reset(s, s->formatter);
+}
+
+void aws_response_handler::send_progress_response()
+{
+  std::string progress_payload="<?xml version=\"1.0\" encoding=\"UTF-8\"?><Progress><BytesScanned>" + to_string(get_processed_size()) + 
+                   "</BytesScanned><BytesProcessed>" + to_string(get_processed_size()) + "</BytesProcessed>" +
+                   "<BytesReturned>" + to_string(get_total_bytes_returned()) + "</BytesReturned></Progress>";
+
+  sql_result.append(progress_payload);
+  int buff_len = create_message(header_size);
+
+  s->formatter->write_bin_data(sql_result.data(), buff_len);
+  rgw_flush_formatter_and_reset(s, s->formatter);
+}
+
+void aws_response_handler::send_stats_response()
+{
+  std::string stats_payload="<?xml version=\"1.0\" encoding=\"UTF-8\"?><Stats><BytesScanned>" + to_string(get_processed_size()) + 
+                   "</BytesScanned><BytesProcessed>" + to_string(get_processed_size()) + "</BytesProcessed>" +
+                   "<BytesReturned>" + to_string(get_total_bytes_returned()) + "</BytesReturned></Stats>";
+
+  sql_result.append(stats_payload);
+  int buff_len = create_message(header_size);
+
+  s->formatter->write_bin_data(sql_result.data(), buff_len);
+  rgw_flush_formatter_and_reset(s, s->formatter);
+}
+
+RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3() : s3select_syntax(std::make_unique<s3selectEngine::s3select>()),
+                                                       m_s3_csv_object(std::unique_ptr<s3selectEngine::csv_object>()),
+                                                       chunk_number(0)
+{
+  set_get_data(true);
+}
+
+RGWSelectObj_ObjStore_S3::~RGWSelectObj_ObjStore_S3()
+{
+}
+
+int RGWSelectObj_ObjStore_S3::get_params(optional_yield y)
+{
+
+  //retrieve s3-select query from payload
+  bufferlist data;
+  int ret;
+  int max_size = 4096;
+  std::tie(ret, data) = read_all_input(s, max_size, false);
+  if (ret != 0) {
+    ldpp_dout(this, 10) << "s3-select query: failed to retrieve query; ret = " << ret << dendl;
+    return ret;
+  }
+
+  m_s3select_query = data.to_str();
+  if (m_s3select_query.length() > 0) {
+    ldpp_dout(this, 10) << "s3-select query: " << m_s3select_query << dendl;
+  }
+  else {
+    ldpp_dout(this, 10) << "s3-select query: failed to retrieve query;" << dendl;
+    return -1;
+  }
+
+  int status = handle_aws_cli_parameters(m_sql_query);
+
+  if (status<0) {
+    return status;
+  }
+
+  return RGWGetObj_ObjStore_S3::get_params(y);
+}
 
 int RGWSelectObj_ObjStore_S3::run_s3select(const char* query, const char* input, size_t input_length)
 {
   int status = 0;
+  uint32_t length_before_processing, length_post_processing;
   csv_object::csv_defintions csv;
-
-  m_result = "012345678901"; //12 positions for header-crc
-
-  int header_size = 0;
+  const char* s3select_syntax_error = "s3select-Syntax-Error";
+  const char* s3select_resource_id = "resourcse-id";
+  const char* s3select_processTime_error = "s3select-ProcessingTime-Error";
 
   if (m_s3_csv_object==0) {
     s3select_syntax->parse_query(query);
@@ -6298,47 +6469,109 @@ int RGWSelectObj_ObjStore_S3::run_s3select(const char* query, const char* input,
       csv.escape_char = *m_escape_char.c_str();
     }
 
+    if (m_enable_progress.compare("true")==0) {
+      enable_progress = true;
+    } else {
+      enable_progress = false;
+    }
+
+    if (output_row_delimiter.size()) {
+      csv.output_row_delimiter = *output_row_delimiter.c_str();
+    }
+
+    if (output_column_delimiter.size()) {
+      csv.output_column_delimiter = *output_column_delimiter.c_str();
+    }
+
+    if (output_quot.size()) {
+      csv.output_quot_char = *output_quot.c_str();
+    }
+
+    if (output_escape_char.size()) {
+      csv.output_escape_char = *output_escape_char.c_str();
+    }
+
+    if(output_quote_fields.compare("ALWAYS") == 0) {
+      csv.quote_fields_always = true;
+    }
+    else if(output_quote_fields.compare("ASNEEDED") == 0) {
+      csv.quote_fields_asneeded = true;
+    }
+
     if(m_header_info.compare("IGNORE")==0) {
       csv.ignore_header_info=true;
     }
     else if(m_header_info.compare("USE")==0) {
       csv.use_header_info=true;
     }
-
     m_s3_csv_object = std::unique_ptr<s3selectEngine::csv_object>(new s3selectEngine::csv_object(s3select_syntax.get(), csv));
   }
 
-  header_size = create_header_records(m_buff_header.get());
-  m_result.append(m_buff_header.get(), header_size);
-  m_result.append(PAYLOAD_LINE);
+  m_aws_response_handler->init_response();
+
+  if (s3select_syntax->get_error_description().empty() == false)
+  { //error-flow (syntax-error)
+    m_aws_response_handler->send_error_response(s3select_syntax_error,
+                                                      s3select_syntax->get_error_description().c_str(),
+                                                      s3select_resource_id);
 
-  if (s3select_syntax->get_error_description().empty() == false) {
-    m_result.append(s3select_syntax->get_error_description());
-    ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax->get_error_description() << "}"<< dendl;
-    status = -1;
+    ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax->get_error_description() << "}" << dendl;
+    return -1;
   }
-  else {
-    status = m_s3_csv_object->run_s3select_on_stream(m_result, input, input_length, s->obj_size);
-    if(status<0) {
-      m_result.append(m_s3_csv_object->get_error_description());
+  else
+  {
+    if (input == nullptr) {
+      input = "";
     }
-  }
+    m_aws_response_handler->init_success_response();
+    length_before_processing = (m_aws_response_handler->get_sql_result()).size();
+
+    //query is correct(syntax), processing is starting.
+    status = m_s3_csv_object->run_s3select_on_stream(m_aws_response_handler->get_sql_result(), input, input_length, s->obj_size);
+    length_post_processing = (m_aws_response_handler->get_sql_result()).size();
+    m_aws_response_handler->set_total_bytes_returned(length_post_processing-length_before_processing);
+    if (status < 0)
+    { //error flow(processing-time)
+      m_aws_response_handler->send_error_response(s3select_processTime_error,
+                                                        m_s3_csv_object->get_error_description().c_str(),
+                                                        s3select_resource_id);
 
-  if (m_result.size() > strlen(PAYLOAD_LINE)) {
-    m_result.append(END_PAYLOAD_LINE);
-    int buff_len = create_message(m_result, m_result.size() - 12, header_size);
-    s->formatter->write_bin_data(m_result.data(), buff_len);
-    if (op_ret < 0) {
-      return op_ret;
+      ldpp_dout(this, 10) << "s3-select query: failed to process query; {" << m_s3_csv_object->get_error_description() << "}" << dendl;
+      return -1;
     }
+
+    if (chunk_number == 0)
+    {//success flow
+      if (op_ret < 0)
+      {
+        set_req_state_err(s, op_ret);
+      }
+      dump_errno(s);
+      // Explicitly use chunked transfer encoding so that we can stream the result
+      // to the user without having to wait for the full length of it.
+      end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING);
+    }
+    chunk_number++;
+  }
+  
+  if (length_post_processing-length_before_processing != 0) {
+    m_aws_response_handler->send_success_response();
+  } else {
+    m_aws_response_handler->send_continuation_response();
+  }
+  
+  if (enable_progress == true) {
+    m_aws_response_handler->init_progress_response();
+    m_aws_response_handler->send_progress_response();
   }
-  rgw_flush_formatter_and_reset(s, s->formatter);
 
   return status;
 }
 
 int RGWSelectObj_ObjStore_S3::handle_aws_cli_parameters(std::string& sql_query)
 {
+  std::string input_tag{"InputSerialization"};
+  std::string output_tag{"OutputSerialization"};
 
   if(chunk_number !=0) {
     return 0;
@@ -6354,65 +6587,82 @@ int RGWSelectObj_ObjStore_S3::handle_aws_cli_parameters(std::string& sql_query)
   }
 
   //AWS cli s3select parameters
-  extract_by_tag("Expression", sql_query);
-  extract_by_tag("FieldDelimiter", m_column_delimiter);
-  extract_by_tag("QuoteCharacter", m_quot);
-  extract_by_tag("RecordDelimiter", m_row_delimiter);
+  extract_by_tag(m_s3select_query, "Expression", sql_query);
+  extract_by_tag(m_s3select_query, "Enabled", m_enable_progress);
+  
+  size_t _qi = m_s3select_query.find("<" + input_tag + ">", 0);
+  size_t _qe = m_s3select_query.find("</" + input_tag + ">", _qi);
+  m_s3select_input = m_s3select_query.substr(_qi + input_tag.size() + 2, _qe - (_qi + input_tag.size() + 2));
+
+  extract_by_tag(m_s3select_input,"FieldDelimiter", m_column_delimiter);
+  extract_by_tag(m_s3select_input, "QuoteCharacter", m_quot);
+  extract_by_tag(m_s3select_input, "RecordDelimiter", m_row_delimiter);
+  extract_by_tag(m_s3select_input, "FileHeaderInfo", m_header_info);
+
   if (m_row_delimiter.size()==0) {
     m_row_delimiter='\n';
   }
+  else if(m_row_delimiter.compare("&#10;") == 0)
+  {//presto change
+    m_row_delimiter='\n';
+  }
+
+  extract_by_tag(m_s3select_input, "QuoteEscapeCharacter", m_escape_char);
+  extract_by_tag(m_s3select_input, "CompressionType", m_compression_type);
+
+  size_t _qo = m_s3select_query.find("<" + output_tag + ">", 0);
+  size_t _qs = m_s3select_query.find("</" + output_tag + ">", _qi);
+  m_s3select_output = m_s3select_query.substr(_qo + output_tag.size() + 2, _qs - (_qo + output_tag.size() + 2));
+  
+  extract_by_tag(m_s3select_output, "FieldDelimiter", output_column_delimiter);
+  extract_by_tag(m_s3select_output, "QuoteCharacter", output_quot);
+  extract_by_tag(m_s3select_output, "QuoteEscapeCharacter", output_escape_char);
+  extract_by_tag(m_s3select_output, "QuoteFields", output_quote_fields);
+  extract_by_tag(m_s3select_output, "RecordDelimiter", output_row_delimiter);
 
-  extract_by_tag("QuoteEscapeCharacter", m_escape_char);
-  extract_by_tag("CompressionType", m_compression_type);
   if (m_compression_type.length()>0 && m_compression_type.compare("NONE") != 0) {
     ldpp_dout(this, 10) << "RGW supports currently only NONE option for compression type" << dendl;
     return -1;
   }
 
-  extract_by_tag("FileHeaderInfo", m_header_info);
-
   return 0;
 }
 
-int RGWSelectObj_ObjStore_S3::extract_by_tag(std::string tag_name, std::string& result)
+int RGWSelectObj_ObjStore_S3::extract_by_tag(std::string input, std::string tag_name, std::string& result)
 {
   result = "";
-  size_t _qs = m_s3select_query.find("<" + tag_name + ">", 0) + tag_name.size() + 2;
+  size_t _qs = input.find("<" + tag_name + ">", 0);
+  size_t qs_input = _qs + tag_name.size() + 2;
   if (_qs == std::string::npos) {
     return -1;
   }
-  size_t _qe = m_s3select_query.find("</" + tag_name + ">", _qs);
+  size_t _qe = input.find("</" + tag_name + ">", qs_input);
   if (_qe == std::string::npos) {
     return -1;
   }
 
-  result = m_s3select_query.substr(_qs, _qe - _qs);
+  result = input.substr(qs_input, _qe - qs_input);
 
   return 0;
 }
 
 int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_t len)
 {
-  if (len == 0) {
-    return 0;
+  int status=0;
+  if (m_aws_response_handler == nullptr) {
+    m_aws_response_handler = std::make_unique<aws_response_handler>(s,this);
   }
-
-  if (chunk_number == 0) {
-    if (op_ret < 0) {
-      set_req_state_err(s, op_ret);
-    }
-    dump_errno(s);
+  
+  if(len == 0 && s->obj_size != 0) {
+    return 0;
   }
+  
+  if (s->obj_size == 0) {
+    status = run_s3select(m_sql_query.c_str(), nullptr, 0);  
+  } else {
 
   auto bl_len = bl.get_num_buffers();
 
-  // Explicitly use chunked transfer encoding so that we can stream the result
-  // to the user without having to wait for the full length of it.
-  if (chunk_number == 0) {
-    end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING);
-  }
-
-  int status=0;
   int i=0;
 
   for(auto& it : bl.buffers()) {
@@ -6425,15 +6675,23 @@ int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_
                         <<  " obj-size " << s->obj_size << dendl;
       continue; 
     }
+    
+    m_aws_response_handler->set_processed_size(it.length());
 
     status = run_s3select(m_sql_query.c_str(), &(it)[0], it.length());
     if(status<0) {
       break;
-    }
+    } 
     i++;
   }
+  }
 
-  chunk_number++;
-
+  if (m_aws_response_handler->get_processed_size() == s->obj_size) {
+    if (status >=0) {
+    m_aws_response_handler->init_stats_response();
+    m_aws_response_handler->send_stats_response();
+    m_aws_response_handler->init_end_response();
+    }
+  }
   return status;
 }
index fd539a2553b23fb5b4d50bea8286d12f4545bfd3..92a7cf2288b7d819e60f8dfa904b56a8254b0f8e 100644 (file)
@@ -914,42 +914,136 @@ class s3select;
 class csv_object;
 }
 
+
+class aws_response_handler
+{//TODO this class should reside on s3select submodule 
+
+private:
+  std::string sql_result;
+  struct req_state *s;//TODO will be replace by callback
+  uint32_t header_size;
+  // the parameters are according to CRC-32 algorithm and its aligned with AWS-cli checksum
+  boost::crc_optimal<32, 0x04C11DB7, 0xFFFFFFFF, 0xFFFFFFFF, true, true> crc32;
+  RGWOp *m_rgwop;
+  std::string m_buff_header;
+  uint64_t total_bytes_returned;
+  uint64_t processed_size;
+
+  enum header_name_En
+  {
+    EVENT_TYPE,
+    CONTENT_TYPE,
+    MESSAGE_TYPE,
+    ERROR_CODE,
+    ERROR_MESSAGE
+  };
+
+  enum header_value_En
+  {
+    RECORDS,
+    OCTET_STREAM,
+    EVENT,
+    CONT,
+    PROGRESS,
+    END,
+    XML,
+    STATS,
+    ENGINE_ERROR,
+    ERROR_TYPE
+  };
+
+  const char *PAYLOAD_LINE= "\n<Payload>\n<Records>\n<Payload>\n";
+  const char *END_PAYLOAD_LINE= "\n</Payload></Records></Payload>";
+  const char *header_name_str[5] =  {":event-type", ":content-type", ":message-type",":error-code",":error-message"};
+  const char *header_value_str[10] = {"Records", "application/octet-stream", "event", "Cont", "Progress", "End", "text/xml", "Stats", "s3select-engine-error","error"};
+  static constexpr size_t header_crc_size = 12;
+
+  void push_header(const char * header_name,const char* header_value);
+
+  int create_message(u_int32_t header_len);
+
+public:
+  //12 positions for header-crc
+  aws_response_handler(struct req_state *ps,RGWOp *rgwop) : sql_result("012345678901"), s(ps),m_rgwop(rgwop),total_bytes_returned{0},processed_size{0}
+  { }
+
+  std::string &get_sql_result();
+
+  uint64_t get_processed_size();
+
+  void set_processed_size(uint64_t value);
+
+  uint64_t get_total_bytes_returned();
+
+  void set_total_bytes_returned(uint64_t value);
+
+  int create_header_records();
+
+  int create_header_continuation();
+
+  int create_header_progress();
+
+  int create_header_stats();
+
+  int create_header_end();
+
+  int create_error_header_records(const char* error_message);
+
+  void init_response();
+
+  void init_success_response();
+
+  void send_continuation_response();
+
+  void init_progress_response();
+
+  void init_end_response();
+
+  void init_stats_response();
+
+  void init_error_response(const char* error_message);
+
+  void send_success_response();
+
+  void send_progress_response();
+
+  void send_stats_response();
+
+  void send_error_response(const char* error_code,
+                          const char* error_message,
+                          const char* resource_id);
+
+}; //end class aws_response_handler
+
 class RGWSelectObj_ObjStore_S3 : public RGWGetObj_ObjStore_S3
 {
 
 private:
   std::unique_ptr<s3selectEngine::s3select> s3select_syntax;
   std::string m_s3select_query;
-  std::string m_result;
+  std::string m_s3select_input;
+  std::string m_s3select_output;
   std::unique_ptr<s3selectEngine::csv_object> m_s3_csv_object;
   std::string m_column_delimiter;
   std::string m_quot;
   std::string m_row_delimiter;
   std::string m_compression_type;
   std::string m_escape_char;
-  std::unique_ptr<char[]>  m_buff_header;
   std::string m_header_info;
   std::string m_sql_query;
+  std::string m_enable_progress;
+  std::string output_column_delimiter;
+  std::string output_quot;
+  std::string output_escape_char;
+  std::string output_quote_fields;
+  std::string output_row_delimiter;
+
+  std::unique_ptr<aws_response_handler> m_aws_response_handler;
+  bool enable_progress;
 
 public:
   unsigned int chunk_number;
 
-  enum header_name_En
-  {
-    EVENT_TYPE,
-    CONTENT_TYPE,
-    MESSAGE_TYPE
-  };
-  static const char* header_name_str[3];
-
-  enum header_value_En
-  {
-    RECORDS,
-    OCTET_STREAM,
-    EVENT
-  };
-  static const char* header_value_str[3];
-
   RGWSelectObj_ObjStore_S3();
   virtual ~RGWSelectObj_ObjStore_S3();
 
@@ -958,19 +1052,10 @@ public:
   virtual int get_params(optional_yield y) override;
 
 private:
-  void encode_short(char* buff, uint16_t s, int& i);
-
-  void encode_int(char* buff, u_int32_t s, int& i);
-
-  int create_header_records(char* buff);
-
-  std::unique_ptr<boost::crc_32_type> crc32;
-
-  int create_message(std::string&, u_int32_t result_len, u_int32_t header_len);
 
   int run_s3select(const char* query, const char* input, size_t input_length);
 
-  int extract_by_tag(std::string tag_name, std::string& result);
+  int extract_by_tag(std::string input, std::string tag_name, std::string& result);
 
   void convert_escape_seq(std::string& esc);
 
index 63129ea4d2777204d0ddc2786c11062b6884a88b..922561ce6bb9017b85d74b9a1a43a52af1ed65ae 160000 (submodule)
@@ -1 +1 @@
-Subproject commit 63129ea4d2777204d0ddc2786c11062b6884a88b
+Subproject commit 922561ce6bb9017b85d74b9a1a43a52af1ed65ae