Production-ready indexer for multiple blockchains with four cooperating workers: Regular (real-time), Catchup (historical), Rescanner (failed/missed blocks), Manual (manual blocks).
This indexer is designed to be used in a multi-chain environment, where each chain is indexed independently and emits events.
Currently Supported:
- Ethereum
- BSC (Binance Smart Chain)
- TRON
- Polygon
- Arbitrum
- Optimism
Roadmap:
- Bitcoin
- Litecoin
- Dogecoin
- Solana
flowchart TB subgraph Workers ["Workers"] direction LR R[RegularWorker] C[CatchupWorker] M[ManualWorker] Re[RescannerWorker] end BW[BaseWorker] subgraph Storage ["Storage & Messaging"] direction LR KV[(KV Store)] NATS[(NATS Events)] FChan[(failedChan)] end Redis[(Redis ZSET)] %% Workers to BaseWorker R --> BW C --> BW M --> BW Re --> BW %% BaseWorker connections BW --> KV BW --> NATS BW --> FChan %% Feedback failedChan -> Rescanner FChan -.-> Re %% ManualWorker special connection M -.-> Redis Logic Flow:
- RegularWorker: real-time indexing, reorg handling, error reporting
- CatchupWorker: backfills gaps, tracks progress, cleans up ranges
- ManualWorker: consumes Redis ranges, concurrent-safe backfill
- RescannerWorker: retries failed blocks, updates KV on success
git clone https://github.com/fystack/multichain-indexer.git cd transaction-indexer go mod download cp configs/config.example.yaml configs/config.yaml go build -o indexer cmd/indexer/main.go # Index EVM & TRON in real-time ./indexer index --chains=ethereum_mainnet,tron_mainnet # Add catchup worker for historical gaps ./indexer index --chains=ethereum_mainnet,tron_mainnet --catchup # Add manual worker for missing blocks ./indexer index --chains=ethereum_mainnet,tron_mainnet --manual # For help ./indexer --help- Shared logic for all worker types
- Rate limiting, logging, bloom filter, KV store integration, infrastructure management
- Sends error blocks to
failedChanand stores in<chain>/failed_blocks/<block>
- Continuously processes latest blocks from RPC
- Saves progress to
<chain>/latest_block - For EVM, handle reorgs with rollback window
- On block failure β BaseWorker stores it for retry
- Processes historical blocks in ranges
[start,end] - Uses KV
<chain>/catchup_progress/<start>-<end>to track progress - Deletes the key when a range is completed
- Integrates failed blocks from Rescanner
-
Handles explicit missing blocks (due to RPC errors, reorg skips, or manual intervention).
-
Missing ranges are stored in Redis ZSET:
- Member format:
"start-end" - Score =
start(to sort ranges by block number) - Large ranges split into small ranges (default 5 blocks) for finer retries
- Member format:
-
Concurrency-safe with Redis locks (
SETNX + EXvia Lua) -
Workflow:
- Claim unprocessed range (
GetNextRange) - Process all blocks in
[start,end] - Update progress with
SetRangeProcessed - On full success β
RemoveRange - On partial timeout β reinsert remaining
[current,end]
- Claim unprocessed range (
- Re-processes failed blocks from KV
<chain>/failed_blocks/<block>orfailedChan - Updates KV when retry succeeds
- Removes blocks after max retry attempts
- Skips chain head block to reduce reorg risk
| Key | Purpose |
|---|---|
<chain>/latest_block | RegularWorker progress |
<chain>/catchup_progress/<start>-<end> | CatchupWorker progress per range |
<chain>/failed_blocks/<block> | Failed blocks metadata for retry |
<chain_type>/<address> | Public key store |
missing_blocks:<chain> | Redis ZSET of missing ranges |
processing:<chain>:<start>-<end> | Redis lock key for concurrent claim |
processed:<chain>:<start>-<end> | Last processed block in range |
Start required services before running the indexer (docker-compose provided):
- NATS server (events)
- Consul (KV) or Badger (embedded)
- PostgreSQL (wallet address repo)
- Redis (for Bloom filter or ManualWorker)
docker-compose up -d- Chains: configurable (
start_block,batch_size,poll_interval) - KVStore: BadgerDB / in-memory / Consul
- Bloom Filter: Redis or in-memory
- Event Emitter: NATS streaming
- RPC Providers: failover + rate-limiting
See configs/config.example.yaml for details.
- Multi-chain support: independent workers per chain
- Auto-catchup: detect gaps β backfill β cleanup
- Failed block recovery: persisted + retryable
- Manual backfill: Redis-driven, safe for concurrency
- State persistence: KV + BlockStore β restart-safe
The chain names passed to --chains must match the names defined in configs/config.yaml. Example: ethereum_mainnet, tron_mainnet.
# Real-time only ./indexer index --chains=ethereum_mainnet,tron_mainnet # Real-time + catchup (fill historical gaps) ./indexer index --chains=ethereum_mainnet,tron_mainnet --catchup # Add manual worker to process missing blocks from Redis ./indexer index --chains=ethereum_mainnet,tron_mainnet --manual # Debug mode (extra logs) ./indexer index --chains=ethereum_mainnet,tron_mainnet --debug # NATS JetStream transaction monitoring (using NATS CLI) nats stream add transfer --subjects="transfer.event.*" --storage=file --retention=workqueue nats consumer add transfer transaction-consumer --filter="transfer.event.dispatch" --deliver=all --ack=explicit nats consumer sub transfer transaction-consumer # Initialize bloom filter and kvstore ./wallet-kv-load run --config configs/config.yaml --batch 10000 --debug # Migrate from Badger to Consul (edit migrate.yaml first) ./kv-migrate run --config configs/config.yaml --dry-runchains: ethereum_mainnet: # <- this is the chain name type: "evm" nodes: - url: "https://eth-mainnet.g.alchemy.com/v2/${API_KEY}" auth: type: "header" key: "Authorization" value: "Bearer ${API_KEY}" - url: "https://rpc.ankr.com/eth" start_block: 21500000 poll_interval: "6s" client: timeout: "20s" max_retries: 3 retry_delay: "5s" throttle: rps: 8 burst: 16 tron_mainnet: type: "tron" nodes: - url: "https://api.trongrid.io" auth: type: "header" key: "TRON-PRO-API-KEY" value: "${TRON_API_KEY}" - url: "https://tron-rpc.publicnode.com" start_block: 75144237 poll_interval: "8s" client: timeout: "20s" max_retries: 5 retry_delay: "10s" throttle: rps: 5 burst: 8The indexer publishes transaction events to NATS JetStream. Here's how to consume them:
- Stream Name:
transfer - Subjects:
transfer.event.* - Transaction Topic:
transfer.event.dispatch - Storage: FileStorage with WorkQueue retention policy
# Create stream (if not exists) nats stream add transfer --subjects="transfer.event.*" --storage=file --retention=workqueue # Create consumer nats consumer add transfer my-consumer --filter="transfer.event.dispatch" --deliver=all --ack=explicit # Consume transactions nats consumer sub transfer my-consumer # Get stream info nats stream info transferpackage main import ( "context" "log" "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" ) func main() { nc, _ := nats.Connect("nats://localhost:4222") defer nc.Close() js, _ := jetstream.New(nc) // Get consumer consumer, _ := js.Consumer(context.Background(), "transfer", "my-consumer") // Consume messages consumer.Consume(func(msg jetstream.Msg) { log.Printf("Transaction: %s", string(msg.Data())) msg.Ack() }) select {} // Keep running }