This repo contains everything necessary to run an Aptos ETL pipeline (it is a monorepo).
To deploy your own instance of this pipeline, you can find full end-to-end deployment scripts and documentation here.
The overall architecture is as follows:
An Aptos mainnet full node and testnet full node are both deployed in a Kubernetes cluster. These nodes act as the data source for this pipeline.
- Data extraction from the nodes and transformation into table records is performed by the Rust code in the
extractor_transformer/directory. Two instances of this code will need to be deployed: one formainnetand one fortestnet, with the environment variables and CLI being configured for each network. This code should be deployed in Kubernetes, and it will need access to the Aptos node's gRPC port. It will also need to authenticate with GCP to dump the generated records into GCS buckets, and to subscribe to a Pub/Sub subscription. - To ensure that multiple instances of the
extractor_transformerdo not process the same data from the Aptos node, an indexing coordinator script will need to be deployed in Kubernetes as well (one for each network). This code is written in Python, and can be found in theindexing_coordinator/directory. It will need to authenticate with GCP to publish and subscribe to Pub/Sub topics. - The coordination performed by
indexing_coordinatorworks by publishing ranges of transaction numbers (called "versions" on Aptos) to a Google Pub/Sub topic. Theextractor_transformerinstances each pull their tasks from the Pub/Sub topic, and make transaction requests to the node's gRPC interface in parallel. To ensure that theextractor_transformerinstances do not receive the same messages from the Pub/Sub topic, all of them will use the samesubscriptionto the topic. This is known as "competing consumers" or "competing subscribers". In testing, Pub/Sub seems to evenly distribute messages to each subscriber.
Bath and stream loading are both supported.
If using the IAC scripts in this repo, the mainnet pipeline will use streaming inserts, and the testnet pipeline will use batch loading.
- Streaming works by publishing record as Protocol Buffers messages to Pub/Sub topics (one topic per table), which Dataflow then subscribes to and inserts into BigQuery.
- Batch loading works by uploading records as JSON files to Google Cloud Storage. Cloud Composer then copies these records from GCS to BigQuery hourly.
extractor_transformer: Rust codebase for data extraction from the node, transformation into table records, and dumping into GCS bucketsindexing_coordinator: Python codebase for coordinating multiple instances ofextractor_transformerin Kubernetesloader: Cloud Composer scripts (aka Airflow DAGs) for loading data from GCS buckets into BigQueryiac: Infrastructure-as-code, such as terraform scripts, helm charts, and BigQuery tables and GCS buckets creation scriptsscripts: Various utilities, such as build scripts forextractor_transformerandindexing_coordinatorschemas: The table schemas for each of the BigQuery tables, in JSON format. Can be used to create the tables usingbq mkcommand (also seeiac/create_tables.sh)

