RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3():
s3select_syntax(std::make_unique<s3selectEngine::s3select>()),
+ m_s3select_query(""),
m_s3_csv_object(std::unique_ptr<s3selectEngine::csv_object>()),
m_s3_parquet_object(std::unique_ptr<s3selectEngine::parquet_object>()),
m_buff_header(std::make_unique<char[]>(1000)),
- chunk_number(0),
- m_s3select_query(""),
m_parquet_type(false),
+ chunk_number(0),
crc32(std::unique_ptr<boost::crc_32_type>())
{
set_get_data(true);
- m_rgw_api = new s3selectEngine::rgw_s3select_api;
+ m_rgw_api = std::unique_ptr<s3selectEngine::rgw_s3select_api>(new s3selectEngine::rgw_s3select_api );
fp_get_obj_size = [&]() { return get_obj_size(); };
fp_range_req = [&](int64_t s, int64_t len, void *buff,optional_yield* y) { return range_request(s, len, buff, y); };
}
if (m_parquet_type){
- m_s3_parquet_object = std::unique_ptr<s3selectEngine::parquet_object>(new s3selectEngine::parquet_object(s->object->get_name(), s3select_syntax.get(), m_rgw_api));
+ m_s3_parquet_object = std::unique_ptr<s3selectEngine::parquet_object>(new s3selectEngine::parquet_object(s->object->get_name(), s3select_syntax.get(), m_rgw_api.get()));
}
else {
m_s3_csv_object = std::unique_ptr<s3selectEngine::csv_object>(new s3selectEngine::csv_object(s3select_syntax.get(), csv));
else
{
if (m_parquet_type){
- //TODO implement formatter call-back
status = m_s3_parquet_object->run_s3select_on_object(m_result,fp_s3select_result_format,fp_result_header_format);
}
}
- //TODO for parquet processing should pass call-back to handle output
if (m_parquet_type == false && m_result.size() > strlen(PAYLOAD_LINE)){
m_result.append(END_PAYLOAD_LINE);
{
int status = 0;
- size_t s1 = 0;
char parquet_magic[4];
+ static constexpr uint8_t parquet_magic1[4] = {'P', 'A', 'R', '1'};
+ static constexpr uint8_t parquet_magicE[4] = {'P', 'A', 'R', 'E'};
get_params(y);
m_rgw_api->m_y = &y;
if (m_parquet_type)
{
- //in case of CSV not to call range request
- range_request(0, 4, parquet_magic,&y); // range_request -> RGWGetObj::execute -> get_params
- //TODO check parquet magic
+ range_request(0, 4, parquet_magic,&y);
+
+ if(memcmp(parquet_magic, parquet_magic1, 4) && memcmp(parquet_magic, parquet_magicE, 4)){
+ ldout(s->cct, 10) << s->object->get_name() << " not contains parquet magic " << dendl;
+ return;
+ }
status = run_s3select(m_sql_query.c_str(), 0, 0);
if (requested_buffer.size() < m_request_range)
{
ldout(s->cct, 10) << " s3select: need another round buffe-size: " << requested_buffer.size() << " request range length:" << m_request_range << dendl;
- return 0; //need another fetch
+ return 0;
}
ldout(s->cct, 10) << " s3select: buffer is complete " << requested_buffer.size() << " request range length:" << m_request_range << dendl;