Pipelines de dados com monitoramento na AWS
Este post mostra como construir e viabilizar um bom gerenciamento de pipeline de dados utilizando serviços serverless da AWS.
Requisitos:
- Conta AWS
Parte 1: Coletando dados
Demostração de contrução de uma função AWS Lambda que transfere dados de uma API pública para um bucket S3.
Provisionando Lambda
Esta Lambda age apenas como uma proxy, para disponibilizar os dados em um bucket S3. Para provisionar bastam as configurações padrão, neste caso com Python runtime.
Código e configuração da Lambda
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import os
import boto3
import requests
from botocore.exceptions import NoCredentialsError
def lambda_handler(event, context):
# URL da API pública "Public APIs"
url = "https://api.publicapis.org/entries"
# Fazer a solicitação GET
response = requests.get(url)
# Verificar se a solicitação foi bem-sucedida
if response.status_code == 200:
# Salvar os dados em um arquivo JSON
with open("/tmp/public_apis.json", "w") as f:
f.write(response.text)
# Obter o nome do bucket do objeto do evento
bucket_name = event['bucket_name']
# Criar um cliente S3
s3 = boto3.client('s3')
# Fazer upload do arquivo para S3
try:
s3.upload_file("/tmp/public_apis.json", bucket_name, "public_apis.json")
return {
'statusCode': 200,
'body': 'Conjunto de dados carregado com sucesso no bucket S3'
}
except FileNotFoundError:
return {
'statusCode': 404,
'body': 'O arquivo não foi encontrado'
}
except NoCredentialsError:
return {
'statusCode': 401,
'body': 'Credenciais não disponíveis'
}
else:
return {
'statusCode': response.status_code,
'body': 'Solicitação falhou'
}
Evento gatilho
Configurar o evento de teste com o JSON:
1
2
3
{
"bucket_name": "seu-nome-bucket"
}
Permissionamento da Lambda
Criar um bucket no Amazon S3
https://s3.console.aws.amazon.com/s3/get-started
Configurar o bucket
Permissionamento do bucket
Para configurar permissões de acesso para que um bucket do Amazon S3 seja público (somente leitura), pode-se seguir estas etapas:
- Abrir o console do Amazon S3.
- No painel de navegação do Buckets, escolher o nome do seu bucket.
- Escolher a guia “Permissões”.
- Retirar o bloqueio de acesso público
- Em “Gerenciamento de acesso ao bucket”, escolher “Editar”.
- Em “Política de bucket”, inserir a seguinte política, substituindo
'your_bucket_name':
1
2
3
4
5
6
7
8
9
10
11
12
{
"Version":"2012-10-17",
"Statement":[
{
"Sid":"PublicReadGetObject",
"Effect":"Allow",
"Principal": "*",
"Action":["s3:GetObject"],
"Resource":["arn:aws:s3:::your_bucket_name/*"]
}
]
}
Essa política permite que qualquer pessoa leia os objetos no bucket.
Parte 2: Pipeline de Dados
Criar bucket de dados processados
Criar um job no AWS Glue
Catalogar com crawler
Desenvolver ETL
Segue um exemplo minimalista de script de ETL em Python, que quebra 50% das vezes para ajudar a explorar as ferramentas de monitoramento na Parte 3.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import json
import boto3
import pandas as pd
import random
# 50% breaks
if random.random() < 0.5:
raise Exception("Random exception")
# Initialize boto3 client
s3 = boto3.client('s3')
def read_from_s3(bucket, key):
response = s3.get_object(Bucket=bucket, Key=key)
content = response['Body'].read().decode('utf-8')
return json.loads(content)
def write_to_s3(bucket, key, data):
s3.put_object(Body=data, Bucket=bucket, Key=key)
def process_data(input_bucket_name, output_bucket_name, input_key, output_key):
# Read JSON from S3
data = read_from_s3(input_bucket_name, input_key)
# Serialize the content from key "entries" in a dataframe
df = pd.DataFrame(data['entries'])
description = df.describe().to_csv()
write_to_s3(output_bucket_name, output_key, description)
# Call the function with your bucket name and keys
process_data('felipe-gomes-miyazato-bucket', 'dlakeanalytics', 'public_apis.json', 'describe_public_apis.csv')
Parte 3: Monitoramento
CloudWatch Dashboards
Monitorar função lambda
Monitorar erros nos jobs do Glue
Ativar o Amazon CloudTrail
Configurar o CloudTrail