eq(response['Body'].read().decode('utf-8'), obj, 's3select error[ downloaded object not equal to uploaded objecy')
-def run_s3select(bucket,key,query,column_delim=",",row_delim="\n",quot_char='"',esc_char='\\',csv_header_info="NONE"):
+def run_s3select(bucket,key,query,column_delim=",",row_delim="\n",quot_char='"',esc_char='\\',csv_header_info="NONE", progress = False):
s3 = get_client()
result = ""
ExpressionType='SQL',
InputSerialization = {"CSV": {"RecordDelimiter" : row_delim, "FieldDelimiter" : column_delim,"QuoteEscapeCharacter": esc_char, "QuoteCharacter": quot_char, "FileHeaderInfo": csv_header_info}, "CompressionType": "NONE"},
OutputSerialization = {"CSV": {}},
- Expression=query,)
+ Expression=query,
+ RequestProgress = {"Enabled": progress})
except ClientError as c:
result += str(c)
return result
-
- for event in r['Payload']:
- if 'Records' in event:
- records = event['Records']['Payload'].decode('utf-8')
- result += records
-
+
+ if progress == False:
+ for event in r['Payload']:
+ if 'Records' in event:
+ records = event['Records']['Payload'].decode('utf-8')
+ result += records
+ else:
+ result = []
+ for event in r['Payload']:
+ if 'Records' in event:
+ records = event['Records']
+ result.append(records.copy())
+ if 'Progress' in event:
+ progress = event['Progress']
+ result.append(progress.copy())
+ if 'Stats' in event:
+ stats = event['Stats']
+ result.append(stats.copy())
+ if 'End' in event:
+ end = event['End']
+ result.append(end.copy())
return result
def remove_xml_tags_from_result(obj):
res_s3select = remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name, 'select count(*) from s3object where cast(_1 as int) != 0 ;')).replace("\n","")
s3select_assert_result( res_s3select_cast, res_s3select )
+
+@attr('s3select')
+def test_progress_expressions():
+
+ csv_obj = create_random_csv_object(1000000,10)
+
+ csv_obj_name = get_random_string()
+ bucket_name = "test"
+ upload_csv_object(bucket_name,csv_obj_name,csv_obj)
+
+ obj_size = len(csv_obj.encode('utf-8'))
+
+ res_s3select_response = run_s3select(bucket_name,csv_obj_name,"select sum(int(_1)) from s3object;",progress = True)
+ records_payload_size = len(remove_xml_tags_from_result( run_s3select(bucket_name,csv_obj_name, 'select sum(int(_1)) from s3object;')).replace("\n",""))
+
+ total_response = len(res_s3select_response)
+
+ # To do: Validate bytes processed after supporting compressed data
+ s3select_assert_result(obj_size, res_s3select_response[total_response-3]['Details']['BytesScanned'])
+ s3select_assert_result(records_payload_size, res_s3select_response[total_response-3]['Details']['BytesReturned'])
+
+ # stats response payload validation
+ s3select_assert_result(obj_size, res_s3select_response[total_response-2]['Details']['BytesScanned'])
+ s3select_assert_result(records_payload_size, res_s3select_response[total_response-2]['Details']['BytesReturned'])
+
+ # end response
+ s3select_assert_result({}, res_s3select_response[total_response-1])
+
+
+
+