Este post mostra como construir e viabilizar um bom gerenciamento de pipeline de dados utilizando serviços serverless da AWS.

Requisitos:

  • Conta AWS

Parte 1: Coletando dados

Demostração de contrução de uma função AWS Lambda que transfere dados de uma API pública para um bucket S3.

Provisionando Lambda

Esta Lambda age apenas como uma proxy, para disponibilizar os dados em um bucket S3. Para provisionar bastam as configurações padrão, neste caso com Python runtime.

Código e configuração da Lambda

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import os
import boto3
import requests
from botocore.exceptions import NoCredentialsError

def lambda_handler(event, context):
    # URL da API pública "Public APIs"
    url = "https://api.publicapis.org/entries"

    # Fazer a solicitação GET
    response = requests.get(url)

    # Verificar se a solicitação foi bem-sucedida
    if response.status_code == 200:
        # Salvar os dados em um arquivo JSON
        with open("/tmp/public_apis.json", "w") as f:
            f.write(response.text)
        
        # Obter o nome do bucket do objeto do evento
        bucket_name = event['bucket_name']

        # Criar um cliente S3
        s3 = boto3.client('s3')

        # Fazer upload do arquivo para S3
        try:
            s3.upload_file("/tmp/public_apis.json", bucket_name, "public_apis.json")
            return {
                'statusCode': 200,
                'body': 'Conjunto de dados carregado com sucesso no bucket S3'
            }
        except FileNotFoundError:
            return {
                'statusCode': 404,
                'body': 'O arquivo não foi encontrado'
            }
        except NoCredentialsError:
            return {
                'statusCode': 401,
                'body': 'Credenciais não disponíveis'
            }
    else:
        return {
            'statusCode': response.status_code,
            'body': 'Solicitação falhou'
        }

Evento gatilho

Configurar o evento de teste com o JSON:

1
2
3
{
  "bucket_name": "seu-nome-bucket"
}
Para usar o pacote requests, há uma layer pública disponível.

Permissionamento da Lambda

Para autorizar a lambda a inputar objetos no bucket é necessário acessar a configuração da role.
Em seguida, acessar a configuração Add permissions > Attach policies.
Neste case, será adicionada a política AmazonS3FullAccess.

Criar um bucket no Amazon S3

https://s3.console.aws.amazon.com/s3/get-started

Configurar o bucket

Para este case, as configurações padrão funcionam.

Permissionamento do bucket

Para configurar permissões de acesso para que um bucket do Amazon S3 seja público (somente leitura), pode-se seguir estas etapas:

  1. Abrir o console do Amazon S3.
  2. No painel de navegação do Buckets, escolher o nome do seu bucket.
  3. Escolher a guia “Permissões”.
  4. Retirar o bloqueio de acesso público
  5. Em “Gerenciamento de acesso ao bucket”, escolher “Editar”.
  6. Em “Política de bucket”, inserir a seguinte política, substituindo 'your_bucket_name':
1
2
3
4
5
6
7
8
9
10
11
12
    {
      "Version":"2012-10-17",
      "Statement":[
        {
          "Sid":"PublicReadGetObject",
          "Effect":"Allow",
          "Principal": "*",
          "Action":["s3:GetObject"],
          "Resource":["arn:aws:s3:::your_bucket_name/*"]
        }
      ]
    }

Essa política permite que qualquer pessoa leia os objetos no bucket.

Parte 2: Pipeline de Dados

Para esta parte é necessário configurar em Set up roles and users.
Marcar Read and write em Data access permissions e as outras configurações deixar no padrão.

Criar bucket de dados processados

Criar um job no AWS Glue

Para deixar o exemplo menos exaustivo, utilizar Python Shell script editor para criar o job.

Catalogar com crawler

Configurações.
Após rodar o crawler já é possível acessar a catalogação.
E dar uma boa espiada na estrutura dos dados.

Desenvolver ETL

Segue um exemplo minimalista de script de ETL em Python, que quebra 50% das vezes para ajudar a explorar as ferramentas de monitoramento na Parte 3.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import json
import boto3
import pandas as pd
import random

# 50% breaks
if random.random() < 0.5:
    raise Exception("Random exception")

# Initialize boto3 client
s3 = boto3.client('s3')

def read_from_s3(bucket, key):
    response = s3.get_object(Bucket=bucket, Key=key)
    content = response['Body'].read().decode('utf-8')
    return json.loads(content)

def write_to_s3(bucket, key, data):
    s3.put_object(Body=data, Bucket=bucket, Key=key)

def process_data(input_bucket_name, output_bucket_name, input_key, output_key):
    # Read JSON from S3
    data = read_from_s3(input_bucket_name, input_key)

    # Serialize the content from key "entries" in a dataframe
    df = pd.DataFrame(data['entries'])

    description = df.describe().to_csv()
    write_to_s3(output_bucket_name, output_key, description)

# Call the function with your bucket name and keys
process_data('felipe-gomes-miyazato-bucket', 'dlakeanalytics', 'public_apis.json', 'describe_public_apis.csv')

Parte 3: Monitoramento

CloudWatch Dashboards

Usar a ferramenta de dashboards nativa do CoudWatch para monitoria.

Monitorar função lambda

Adicionar widget de tabela de logs, configurada com o grupo de logs da lambda.

Monitorar erros nos jobs do Glue

Adicionar widget de tabela de logs, configurada com o grupo de logs de erros do Glue.

Ativar o Amazon CloudTrail

Para habilitar persistência dos logs crie uma Trail.

Configurar o CloudTrail

Para cada grupo de log que seja necessária a persistência crie uma Trail e configure com o grupo.