Rodando queries no Amazon Athena por usando python
O Amazon Athena é um serviço de queries que facilita o acesso de dados no S3 por meio do uso do SQL.
É interessante ler mais sobre as estruturas de serviços do Amazon e como elas se conectam para que fique um pouco mais claro.
Boto3 é um grande aliado nisso para quem usará python para pegar/analisar os dados. A sua documentação é bem completa e ajuda bastante no entendimento:
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena.html
(Obs: Considerando que as credencias aws estejam já corretamente setadas na máquina)
import boto3
import time
- Checar em qual perfil estão os dados
session = boto3.Session(profile_name='profile_name')
athena = session.client(service_name='athena', region_name='sa-east-1')
Aqui, diferença com pegar arquivos diretamente de bucket do S3.
Para lidar com problema de paginação, numa view ou tabela, por exemplo:
query_string: a SQL-like query that Athena will execute
client: an Athena client created with boto3
source: https://gist.github.com/schledererj/b2e2a800998d61af2bbdd1cd50e08b76
def fetchall_athena(query_string, client):
query_id = client.start_query_execution(
QueryString=query_string,
QueryExecutionContext={
'Database': 'zeus'
},
ResultConfiguration={
'OutputLocation': 'path_para_a_query'
}
)['QueryExecutionId']
query_status = None
while query_status == 'QUEUED' or query_status == 'RUNNING' or query_status is None:
query_status = client.get_query_execution(QueryExecutionId=query_id)['QueryExecution']['Status']['State']
if query_status == 'FAILED' or query_status == 'CANCELLED':
raise Exception('Athena query with the string "{}" failed or was cancelled'.format(query_string))
time.sleep(100)
results_paginator = client.get_paginator('get_query_results')
results_iter = results_paginator.paginate(
QueryExecutionId=query_id,
PaginationConfig={
'PageSize': 1000
}
)
results = []
column_names = None
for results_page in results_iter:
for row in results_page['ResultSet']['Rows']:
column_values = [col.get('VarCharValue', None) for col in row['Data']]
if not column_names:
column_names = column_values
else:
results.append(dict(zip(column_names, column_values)))
return results
- Get
query_string = 'SELECT * FROM xxxx' # coloque aqui sua query
client = athena
data_zeus = fetchall_athena(query_string=query_string, client=client)
df_data = pd.DataFrame(data_frame)
Export, para que não tenha que sempre fazer a query, e compressed, porque é relativamente grande:
compression_opts = dict(method='zip', archive_name='df.csv')
df_data.to_csv('df.zip', index=False, compression=compression_opts)