]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
add unique pointer (rgw_api); verify magic number for parquet objects; s3select modul... wip-s3select-parquet-object-processing-2
authorgal salomon <gal.salomon@gmail.com>
Fri, 16 Apr 2021 17:24:50 +0000 (20:24 +0300)
committergalsalomon66 <gal.salomon@gmail.com>
Wed, 21 Jul 2021 15:59:46 +0000 (18:59 +0300)
Signed-off-by: gal salomon <gal.salomon@gmail.com>
src/rgw/rgw_rest_s3.cc
src/rgw/rgw_rest_s3.h

index 8a5b7f5159edb225f856b59dc2d35c5a5a975413..4b6c4f624c2019ef85523c3e29bab3162566c400 100644 (file)
@@ -6096,16 +6096,16 @@ const char* RGWSelectObj_ObjStore_S3::header_value_str[3] = {"Records", "applica
 
 RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3():
   s3select_syntax(std::make_unique<s3selectEngine::s3select>()),
+  m_s3select_query(""),
   m_s3_csv_object(std::unique_ptr<s3selectEngine::csv_object>()),
   m_s3_parquet_object(std::unique_ptr<s3selectEngine::parquet_object>()),
   m_buff_header(std::make_unique<char[]>(1000)),
-  chunk_number(0),
-  m_s3select_query(""),
   m_parquet_type(false),
+  chunk_number(0),
   crc32(std::unique_ptr<boost::crc_32_type>())
 {
   set_get_data(true);
-  m_rgw_api = new s3selectEngine::rgw_s3select_api;
+  m_rgw_api = std::unique_ptr<s3selectEngine::rgw_s3select_api>(new s3selectEngine::rgw_s3select_api );
   fp_get_obj_size = [&]() { return get_obj_size(); };
   fp_range_req = [&](int64_t s, int64_t len, void *buff,optional_yield* y) { return range_request(s, len, buff, y); }; 
 
@@ -6287,7 +6287,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select(const char* query, const char* input,
     }
 
     if (m_parquet_type){
-      m_s3_parquet_object = std::unique_ptr<s3selectEngine::parquet_object>(new s3selectEngine::parquet_object(s->object->get_name(), s3select_syntax.get(), m_rgw_api));
+      m_s3_parquet_object = std::unique_ptr<s3selectEngine::parquet_object>(new s3selectEngine::parquet_object(s->object->get_name(), s3select_syntax.get(), m_rgw_api.get()));
     } 
     else {
       m_s3_csv_object = std::unique_ptr<s3selectEngine::csv_object>(new s3selectEngine::csv_object(s3select_syntax.get(), csv));
@@ -6326,7 +6326,6 @@ int RGWSelectObj_ObjStore_S3::run_s3select(const char* query, const char* input,
   else
   {
     if (m_parquet_type){
-            //TODO implement formatter call-back
             status = m_s3_parquet_object->run_s3select_on_object(m_result,fp_s3select_result_format,fp_result_header_format);
 
     }
@@ -6345,7 +6344,6 @@ int RGWSelectObj_ObjStore_S3::run_s3select(const char* query, const char* input,
   }
 
   
-  //TODO for parquet processing should pass call-back to handle output
   if (m_parquet_type == false && m_result.size() > strlen(PAYLOAD_LINE)){
     
     m_result.append(END_PAYLOAD_LINE);
@@ -6451,17 +6449,21 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y)
 {
   
   int status = 0;
-  size_t s1 = 0;
   char parquet_magic[4];
+  static constexpr uint8_t parquet_magic1[4] = {'P', 'A', 'R', '1'};
+  static constexpr uint8_t parquet_magicE[4] = {'P', 'A', 'R', 'E'};
 
   get_params(y);
   m_rgw_api->m_y = &y;
 
   if (m_parquet_type)
   {
-    //in case of CSV not to call range request
-    range_request(0, 4, parquet_magic,&y); // range_request -> RGWGetObj::execute -> get_params
-                                        //TODO check parquet magic
+    range_request(0, 4, parquet_magic,&y);
+    
+    if(memcmp(parquet_magic, parquet_magic1, 4) && memcmp(parquet_magic, parquet_magicE, 4)){
+      ldout(s->cct, 10) << s->object->get_name() << " not contains parquet magic " << dendl;
+      return;
+    }
 
     status = run_s3select(m_sql_query.c_str(), 0, 0);
 
@@ -6510,7 +6512,7 @@ int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_
     if (requested_buffer.size() < m_request_range)
     {
       ldout(s->cct, 10) << " s3select: need another round buffe-size: " << requested_buffer.size() << " request range length:" << m_request_range << dendl;
-      return 0; //need another fetch
+      return 0;
     }
 
     ldout(s->cct, 10) << " s3select: buffer is complete " << requested_buffer.size() << " request range length:" << m_request_range << dendl;
index 3a44fde236aa4b0a5e2dc0e04998542f2ed33f98..3bc3fdaa98c749a135fe7bc652636b034d62c6ad 100644 (file)
@@ -909,7 +909,7 @@ private:
 
   //parquet request 
   bool m_parquet_type;
-  s3selectEngine::rgw_s3select_api* m_rgw_api;
+  std::unique_ptr<s3selectEngine::rgw_s3select_api> m_rgw_api;
   size_t m_request_range;//a request for range may statisfy by several calls to send_response_date (call back)
   std::string requested_buffer;
   std::string range_req_str;