]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
s3select: refacor response handling (*) dedicate object is handling s3select response
authorgal salomon <gal.salomon@gmail.com>
Fri, 7 May 2021 21:29:13 +0000 (00:29 +0300)
committergalsalomon66 <gal.salomon@gmail.com>
Sun, 7 Nov 2021 14:26:28 +0000 (16:26 +0200)
Signed-off-by: gal salomon <gal.salomon@gmail.com>
src/rgw/rgw_rest_s3.cc
src/rgw/rgw_rest_s3.h

index e3b40bc75a64e7741e78bf7b6d7ab74eead7b6d0..9ffcdfd30d02dd186e829593e8d8224f2bf844d0 100644 (file)
@@ -6127,15 +6127,198 @@ bool rgw::auth::s3::S3AnonymousEngine::is_applicable(
 
 
 using namespace s3selectEngine;
-const char* RGWSelectObj_ObjStore_S3::header_name_str[3] = {":event-type", ":content-type", ":message-type"};
-const char* RGWSelectObj_ObjStore_S3::header_value_str[3] = {"Records", "application/octet-stream", "event"};
 
-RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3():
-  s3select_syntax(std::make_unique<s3selectEngine::s3select>()),
-  m_s3_csv_object(std::unique_ptr<s3selectEngine::csv_object>()),
-  m_buff_header(std::make_unique<char[]>(1000)),
-  chunk_number(0),
-  crc32(std::unique_ptr<boost::crc_32_type>())
+class aws_response_handler
+{//TODO this class should reside on s3select submodule 
+
+private:
+  std::string sql_result;
+  struct req_state *s;//TODO will be replace by callback
+  uint32_t header_size;
+  std::unique_ptr<boost::crc_32_type> crc32;
+  RGWOp *m_rgwop;
+  std::string m_buff_header;
+
+  enum header_name_En
+  {
+    EVENT_TYPE,
+    CONTENT_TYPE,
+    MESSAGE_TYPE,
+    ERROR_CODE,
+    ERROR_MESSAGE
+
+  };
+
+  enum header_value_En
+  {
+    RECORDS,
+    OCTET_STREAM,
+    EVENT,
+    ENGINE_ERROR,
+    ERROR_TYPE
+  };
+
+  const char *PAYLOAD_LINE= "\n<Payload>\n<Records>\n<Payload>\n";
+  const char *END_PAYLOAD_LINE= "\n</Payload></Records></Payload>";
+  const char *header_name_str[5] =  {":event-type", ":content-type", ":message-type","error-code","error-message"};
+  const char *header_value_str[5] = {"Records", "application/octet-stream", "event","s3select-engine-error","error"};
+
+public:
+  //12 positions for header-crc
+  aws_response_handler(struct req_state *ps,RGWOp *rgwop) : sql_result("012345678901"), s(ps),m_rgwop(rgwop)
+  {
+    // the parameters are according to CRC-32 algorithm and its aligned with AWS-cli checksum
+    crc32 = std::unique_ptr<boost::crc_32_type>(new boost::crc_optimal<32, 0x04C11DB7, 0xFFFFFFFF, 0xFFFFFFFF, true, true>);
+  }
+
+  std::string &get_sql_result()
+  {
+    return sql_result;
+  }
+
+  void push_header(const char * header_name,const char* header_value)
+  {
+    char x;
+    short s;
+    
+    x = char(strlen(header_name));
+    m_buff_header.append(&x,sizeof(x));
+    m_buff_header.append(header_name);
+      
+    x = char(7);
+    m_buff_header.append(&x,sizeof(x));
+
+    s = htons(uint16_t(strlen(header_value)));
+    m_buff_header.append(reinterpret_cast<char*>(&s),sizeof(s)); 
+    m_buff_header.append(header_value);
+  }
+
+  int create_header_records()
+  {
+    //headers description(AWS)
+    //[header-name-byte-length:1][header-name:variable-length][header-value-type:1][header-value:variable-length]
+
+    //1
+    push_header(header_name_str[EVENT_TYPE],header_value_str[RECORDS]);
+    //2
+    push_header(header_name_str[CONTENT_TYPE],header_value_str[OCTET_STREAM]);
+    //3
+    push_header(header_name_str[MESSAGE_TYPE],header_value_str[EVENT]);
+
+    return m_buff_header.size();
+  }
+
+  int create_error_header_records(const char* error_message)
+  {
+    //headers description(AWS)
+    //[header-name-byte-length:1][header-name:variable-length][header-value-type:1][header-value:variable-length]
+
+    //1
+    push_header(header_name_str[ERROR_CODE],header_value_str[ENGINE_ERROR]);
+    //2
+    push_header(header_name_str[ERROR_MESSAGE],error_message);
+    //3
+    push_header(header_name_str[MESSAGE_TYPE],header_value_str[ERROR_TYPE]);
+
+    return m_buff_header.size();
+  }
+
+  int create_message(u_int32_t header_len)
+  {
+    //message description(AWS):
+    //[total-byte-length:4][header-byte-length:4][crc:4][headers:variable-length][payload:variable-length][crc:4]
+    //s3select result is produced into sql_result, the sql_result is also the response-message, thus the attach headers and CRC
+    //are created later to the produced SQL result, and actually wrapping the payload.
+
+    auto push_encode_int = [&](u_int32_t s,int pos)
+    {
+      u_int32_t x = htonl(s);
+      sql_result.replace(pos,sizeof(x),reinterpret_cast<char*>(&x),sizeof(x));
+    };
+
+
+    u_int32_t total_byte_len = 0;
+    u_int32_t preload_crc = 0;
+    u_int32_t message_crc = 0;
+
+    total_byte_len = sql_result.size() + 4; //the total is greater in 4 bytes than current size
+
+    push_encode_int(total_byte_len,0);
+    push_encode_int(header_len,4);
+
+    crc32->reset();
+    *crc32 = std::for_each(sql_result.data(), sql_result.data() + 8, *crc32); //crc for starting 8 bytes
+    preload_crc = (*crc32)();
+    push_encode_int(preload_crc,8);
+
+    crc32->reset();
+    *crc32 = std::for_each(sql_result.begin(),  sql_result.end(), *crc32); //crc for payload + checksum
+    message_crc = (*crc32)();
+
+    u_int32_t x = htonl(message_crc);
+    sql_result.append(reinterpret_cast<char*>(&x), sizeof(x));
+
+    return sql_result.size();
+  }
+
+  void init_response()
+  { //12 positions for header-crc
+    sql_result = "012345678901";
+  }
+
+  void init_success_response()
+  {
+    m_buff_header.clear();
+    header_size = create_header_records();
+    sql_result.append(m_buff_header.c_str(), header_size);
+    sql_result.append(PAYLOAD_LINE);
+  }
+
+  void init_error_response(const char* error_message)
+  {//currently not in use. the headers in the case of error, are not extracted by AWS-cli.
+    m_buff_header.clear();
+    header_size = create_error_header_records(error_message);
+    sql_result.append(m_buff_header, header_size);
+  }
+
+  void send_success_response()
+  {
+    if (sql_result.size() > strlen(PAYLOAD_LINE))
+    {
+      sql_result.append(END_PAYLOAD_LINE);
+      int buff_len = create_message(header_size);
+      s->formatter->write_bin_data(sql_result.data(), buff_len);
+    }
+    rgw_flush_formatter_and_reset(s, s->formatter);
+  }
+
+  void send_error_response(const char* error_code,
+                          const char* error_message,
+                          const char* resource_id)
+  {
+
+    set_req_state_err(s, 0);//TODO what err_no?
+    dump_errno(s, 400);
+    end_header(s, m_rgwop, "application/xml", CHUNKED_TRANSFER_ENCODING);
+    dump_start(s);
+
+    s->formatter->open_object_section("Error");
+
+    s->formatter->dump_string("Code", error_code);
+    s->formatter->dump_string("Message", error_message);
+    s->formatter->dump_string("Resource", "#Resource#");
+    s->formatter->dump_string("RequestId", resource_id);
+
+    s->formatter->close_section();
+
+    rgw_flush_formatter_and_reset(s, s->formatter);
+  }
+
+}; //end class aws_response_handler
+
+RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3() : s3select_syntax(std::make_unique<s3selectEngine::s3select>()),
+                                                       m_s3_csv_object(std::unique_ptr<s3selectEngine::csv_object>()),
+                                                       chunk_number(0)
 {
   set_get_data(true);
 }
@@ -6175,108 +6358,13 @@ int RGWSelectObj_ObjStore_S3::get_params(optional_yield y)
   return RGWGetObj_ObjStore_S3::get_params(y);
 }
 
-void RGWSelectObj_ObjStore_S3::encode_short(char* buff, uint16_t s, int& i)
-{
-  short x = htons(s);
-  memcpy(buff, &x, sizeof(s));
-  i+=sizeof(s);
-}
-
-void RGWSelectObj_ObjStore_S3::encode_int(char* buff, u_int32_t s, int& i)
-{
-  u_int32_t x = htonl(s);
-  memcpy(buff, &x, sizeof(s));
-  i+=sizeof(s);
-}
-
-int RGWSelectObj_ObjStore_S3::create_header_records(char* buff)
-{
-  int i = 0;
-
-  //headers description(AWS)
-  //[header-name-byte-length:1][header-name:variable-length][header-value-type:1][header-value:variable-length]
-  
-  //1
-  buff[i++] = char(strlen(header_name_str[EVENT_TYPE]));
-  memcpy(&buff[i], header_name_str[EVENT_TYPE], strlen(header_name_str[EVENT_TYPE]));
-  i += strlen(header_name_str[EVENT_TYPE]);
-  buff[i++] = char(7);
-  encode_short(&buff[i], uint16_t(strlen(header_value_str[RECORDS])), i);
-  memcpy(&buff[i], header_value_str[RECORDS], strlen(header_value_str[RECORDS]));
-  i += strlen(header_value_str[RECORDS]);
-
-  //2
-  buff[i++] = char(strlen(header_name_str[CONTENT_TYPE]));
-  memcpy(&buff[i], header_name_str[CONTENT_TYPE], strlen(header_name_str[CONTENT_TYPE]));
-  i += strlen(header_name_str[CONTENT_TYPE]);
-  buff[i++] = char(7);
-  encode_short(&buff[i], uint16_t(strlen(header_value_str[OCTET_STREAM])), i);
-  memcpy(&buff[i], header_value_str[OCTET_STREAM], strlen(header_value_str[OCTET_STREAM]));
-  i += strlen(header_value_str[OCTET_STREAM]);
-
-  //3
-  buff[i++] = char(strlen(header_name_str[MESSAGE_TYPE]));
-  memcpy(&buff[i], header_name_str[MESSAGE_TYPE], strlen(header_name_str[MESSAGE_TYPE]));
-  i += strlen(header_name_str[MESSAGE_TYPE]);
-  buff[i++] = char(7);
-  encode_short(&buff[i], uint16_t(strlen(header_value_str[EVENT])), i);
-  memcpy(&buff[i], header_value_str[EVENT], strlen(header_value_str[EVENT]));
-  i += strlen(header_value_str[EVENT]);
-
-  return i;
-}
-
-int RGWSelectObj_ObjStore_S3::create_message(std::string &out_string, u_int32_t result_len, u_int32_t header_len)
-{
-  //message description(AWS): 
-  //[total-byte-length:4][header-byte-length:4][crc:4][headers:variable-length][payload:variable-length][crc:4]
-  //s3select result is produced into m_result, the m_result is also the response-message, thus the attach headers and CRC 
-  //are created later to the produced SQL result, and actually wrapping the payload.
-
-  u_int32_t total_byte_len = 0;
-  u_int32_t preload_crc = 0;
-  u_int32_t message_crc = 0;
-  int i = 0;
-  char * buff = out_string.data();
-
-  if(crc32 ==0) {
-    // the parameters are according to CRC-32 algorithm and its aligned with AWS-cli checksum
-    crc32 = std::unique_ptr<boost::crc_32_type>(new boost::crc_optimal<32, 0x04C11DB7, 0xFFFFFFFF, 0xFFFFFFFF, true, true>);
-  }
-
-  total_byte_len = result_len + 16;//the total is greater in 4 bytes than current size
-
-  encode_int(&buff[i], total_byte_len, i);//store sizes at the beginning of the buffer
-  encode_int(&buff[i], header_len, i);
-
-  crc32->reset();
-  *crc32 = std::for_each( buff, buff + 8, *crc32 );//crc for starting 8 bytes
-  preload_crc = (*crc32)();
-  encode_int(&buff[i], preload_crc, i);
-
-  i += result_len;//advance to the end of payload.
-
-  crc32->reset();
-  *crc32 = std::for_each( buff, buff + i, *crc32 );//crc for payload + checksum
-  message_crc = (*crc32)();
-  char out_encode[4];
-  encode_int(out_encode, message_crc, i);
-  out_string.append(out_encode,sizeof(out_encode));
-
-  return i;
-}
-
-#define PAYLOAD_LINE "\n<Payload>\n<Records>\n<Payload>\n"
-#define END_PAYLOAD_LINE "\n</Payload></Records></Payload>"
-
 int RGWSelectObj_ObjStore_S3::run_s3select(const char* query, const char* input, size_t input_length)
 {
   int status = 0;
   csv_object::csv_defintions csv;
-
-  m_result = "012345678901"; //12 positions for header-crc
-
-  int header_size = 0;
+  const char* s3select_syntax_error = "s3select-Syntax-Error";
+  const char* s3select_resource_id = "resourcse-id";
+  const char* s3select_processTime_error = "s3select-ProcessingTime-Error";
 
   if (m_s3_csv_object==0) {
     s3select_syntax->parse_query(query);
@@ -6304,34 +6392,54 @@ int RGWSelectObj_ObjStore_S3::run_s3select(const char* query, const char* input,
       csv.use_header_info=true;
     }
 
+    m_aws_response_handler = std::make_unique<aws_response_handler>(s,this);
     m_s3_csv_object = std::unique_ptr<s3selectEngine::csv_object>(new s3selectEngine::csv_object(s3select_syntax.get(), csv));
   }
 
-  header_size = create_header_records(m_buff_header.get());
-  m_result.append(m_buff_header.get(), header_size);
-  m_result.append(PAYLOAD_LINE);
+  m_aws_response_handler.get()->init_response();
 
-  if (s3select_syntax->get_error_description().empty() == false) {
-    m_result.append(s3select_syntax->get_error_description());
-    ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax->get_error_description() << "}"<< dendl;
-    status = -1;
+  if (s3select_syntax->get_error_description().empty() == false)
+  { //error-flow (syntax-error)
+    m_aws_response_handler.get()->send_error_response(s3select_syntax_error,
+                                                      s3select_syntax->get_error_description().c_str(),
+                                                      s3select_resource_id);
+
+    ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax->get_error_description() << "}" << dendl;
+    return -1;
   }
-  else {
-    status = m_s3_csv_object->run_s3select_on_stream(m_result, input, input_length, s->obj_size);
-    if(status<0) {
-      m_result.append(m_s3_csv_object->get_error_description());
+  else
+  {
+
+    m_aws_response_handler.get()->init_success_response();
+
+    //query is correct(syntax), processing is starting.
+    status = m_s3_csv_object->run_s3select_on_stream(m_aws_response_handler.get()->get_sql_result(), input, input_length, s->obj_size);
+    if (status < 0)
+    { //error flow(processing-time)
+      m_aws_response_handler.get()->send_error_response(s3select_processTime_error,
+                                                        m_s3_csv_object->get_error_description().c_str(),
+                                                        s3select_resource_id);
+
+      ldpp_dout(this, 10) << "s3-select query: failed to process query; {" << m_s3_csv_object->get_error_description() << "}" << dendl;
+      return -1;
     }
-  }
 
-  if (m_result.size() > strlen(PAYLOAD_LINE)) {
-    m_result.append(END_PAYLOAD_LINE);
-    int buff_len = create_message(m_result, m_result.size() - 12, header_size);
-    s->formatter->write_bin_data(m_result.data(), buff_len);
-    if (op_ret < 0) {
-      return op_ret;
+    if (chunk_number == 0)
+    {//success flow
+      if (op_ret < 0)
+      {
+        set_req_state_err(s, op_ret);
+      }
+      dump_errno(s);
+      // Explicitly use chunked transfer encoding so that we can stream the result
+      // to the user without having to wait for the full length of it.
+      end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING);
     }
+
+    chunk_number++;
   }
-  rgw_flush_formatter_and_reset(s, s->formatter);
+
+  m_aws_response_handler.get()->send_success_response();
 
   return status;
 }
@@ -6396,21 +6504,8 @@ int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_
     return 0;
   }
 
-  if (chunk_number == 0) {
-    if (op_ret < 0) {
-      set_req_state_err(s, op_ret);
-    }
-    dump_errno(s);
-  }
-
   auto bl_len = bl.get_num_buffers();
 
-  // Explicitly use chunked transfer encoding so that we can stream the result
-  // to the user without having to wait for the full length of it.
-  if (chunk_number == 0) {
-    end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING);
-  }
-
   int status=0;
   int i=0;
 
@@ -6432,7 +6527,6 @@ int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_
     i++;
   }
 
-  chunk_number++;
 
   return status;
 }
index fd539a2553b23fb5b4d50bea8286d12f4545bfd3..5db02d128326d85c1dc0c6719d5682b8b1a5aa0c 100644 (file)
@@ -914,42 +914,27 @@ class s3select;
 class csv_object;
 }
 
+class aws_response_handler;
+
 class RGWSelectObj_ObjStore_S3 : public RGWGetObj_ObjStore_S3
 {
 
 private:
   std::unique_ptr<s3selectEngine::s3select> s3select_syntax;
   std::string m_s3select_query;
-  std::string m_result;
   std::unique_ptr<s3selectEngine::csv_object> m_s3_csv_object;
   std::string m_column_delimiter;
   std::string m_quot;
   std::string m_row_delimiter;
   std::string m_compression_type;
   std::string m_escape_char;
-  std::unique_ptr<char[]>  m_buff_header;
   std::string m_header_info;
   std::string m_sql_query;
+  std::unique_ptr<aws_response_handler> m_aws_response_handler;
 
 public:
   unsigned int chunk_number;
 
-  enum header_name_En
-  {
-    EVENT_TYPE,
-    CONTENT_TYPE,
-    MESSAGE_TYPE
-  };
-  static const char* header_name_str[3];
-
-  enum header_value_En
-  {
-    RECORDS,
-    OCTET_STREAM,
-    EVENT
-  };
-  static const char* header_value_str[3];
-
   RGWSelectObj_ObjStore_S3();
   virtual ~RGWSelectObj_ObjStore_S3();
 
@@ -958,15 +943,6 @@ public:
   virtual int get_params(optional_yield y) override;
 
 private:
-  void encode_short(char* buff, uint16_t s, int& i);
-
-  void encode_int(char* buff, u_int32_t s, int& i);
-
-  int create_header_records(char* buff);
-
-  std::unique_ptr<boost::crc_32_type> crc32;
-
-  int create_message(std::string&, u_int32_t result_len, u_int32_t header_len);
 
   int run_s3select(const char* query, const char* input, size_t input_length);