中文 | English
Etl-engine is a lightweight, robust, and extensible Headless ETL library designed for developers. It focuses on high-concurrency and high-performance data synchronization, serving as the ideal code-level alternative to heavy, GUI-based ETL tools like Kettle (PDI).
Etl-engine provides the following three core features:
Significant improvement in data processing and database I/O speeds through batch operations and a non-blocking, cached pipeline design.
📊 Real-World Test: For an insert/update task involving etl-engine is
-
Self-Healing: Tasks do not crash on individual data errors; it supports automatic retries and error handling to ensure stable long-running jobs.
-
Full-Link Logging: Provides detailed execution metrics for debugging.
The core consists of only three main components: Node, Pipe, and Dataflow . All data loading logic is abstracted into extensible Nodes. In addition to the built-in JDBC data source Node, users can easily inherit the base class to quickly develop new data sources (such as Http, Redis) or custom transformation logic to meet specific business requirements.
The following code demonstrates how to quickly build an ETL task that extracts data from Oracle and synchronizes (Upsert) it to PostgreSQL (Load).
flowchart LR sqlInputNode --pipe--> upsertOutputNode //Create Oracle data source DataSource dataSourceOracle = DataSourceUtil.getOracleDataSource(); //Create talbe output node SqlInputNode sqlInputNode = new SqlInputNode(dataSourceOracle, "select * from t_resident_info"); //Create Postgres data source DataSource dataSourcePG = DataSourceUtil.getPostgresDataSource(); //Create upsert output node UpsertOutputNode upsertOutputNode = new UpsertOutputNode(dataSourcePG, "t_resident_info", 1000); //Set the unique identifier (primary key) mapping, used to determine Insert or Update. upsertOutputNode.setIdentityMapping(Arrays.asList(new Tuple2<>("ID", "ID"))); //Create a pipe and set the buffer size to 1,000 data rows. Pipe pipe = new Pipe(1000); //Connect the sql input node and table upsert node. pipe.connect(sqlInputNode, upsertOutputNode); //Create dataflow instance Dataflow dataflow = new Dataflow(sqlInputNode); //Start the data flow and set the timeout after 5 minutes. dataflow.syncStart(5, TimeUnit.MINUTES);flowchart LR SqlInputNode --pipe-->ValueConversionNode --pipe--> UpsertOutputNode package io.github.add2ws.node; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.liuneng.base.MiddleNode; import org.liuneng.base.Row; import org.liuneng.exception.NodeException; @Slf4j public class ValueConversionNode extends MiddleNode { @Override protected @NonNull Row process(@NonNull Row row) throws NodeException { // Converting values from column 'gender' to 'gender_name'. if ("1".equals(row.get("gender"))) { row.put("gender_name", "male"); } else { row.put("gender_name", "female"); } // Masking values of column 'address'. String address = String.valueOf(row.get("address")); if (address != null) { String masked = address.replaceAll("^(.).*(.)$", "$1***$2"); row.put("address", masked); } return row; } @Override public String[] getColumns() throws NodeException { //Adding column 'gender_name' for subsequent nodes. return new String[]{"gender_name"}; } @Override public @NonNull Type getType() { //MiddleNode type: When connecting to multiple downstream nodes, // determines whether the data stream is copied or distributed. return Type.COPY; } }// Create Oracle data source and SQL input node DataSource dataSourceOracle = DataSourceUtil.getOracleDataSource(); String sql = "SELECT * FROM ETL_BASE.T_RESIDENT_INFO WHERE 1=1 AND ROWNUM < 50000"; SqlInputNode sqlInputNode = new SqlInputNode(dataSourceOracle, sql); // Create Postgres data source and table output node DataSource dataSourcePG = DataSourceUtil.getPostgresDataSource(); UpsertOutputNode upsertOutputNode = new UpsertOutputNode(dataSourcePG, "t_resident_info", 1000); upsertOutputNode.setIdentityMapping(Arrays.asList(new Tuple2<>("ID", "ID"))); // Create value conversion node ValueConversionNode valueConversionNode = new ValueConversionNode(); // Connect Oracle input node to value conversion node Pipe pipe = new Pipe(10000); pipe.connect(sqlInputNode, valueConversionNode); // Connect value conversion node to Postgres output node pipe = new Pipe(10000); pipe.connect(valueConversionNode, upsertOutputNode); // Start dataflow Dataflow dataflow = new Dataflow(sqlInputNode); dataflow.syncStart(5, TimeUnit.MINUTES);3. One SQL input node distributes the data stream to multiple output nodes based on column value evaluation.
flowchart LR SqlInputNode --Pipe--> ConditionNode ConditionNode --"Pipe[0](gender=1)"--> UpsertOutputNode ConditionNode --"Pipe[1](gender=2)"--> FileOutputNode package io.github.add2ws.node; import lombok.NonNull; import org.liuneng.base.MiddleNode; import org.liuneng.base.Row; import org.liuneng.exception.NodeException; public class ConditionNode extends MiddleNode { @Override protected @NonNull Row process(@NonNull Row row) throws NodeException { Object gender = row.get("gender"); if ("1".equals(gender)) { // Distribute data where gender=1 to the first downstream pipe row.setPipeIndex(0); return row; } else { // Otherwise, distribute to the second downstream pipe row.setPipeIndex(1); return row; } } @Override public String[] getColumns() throws NodeException { // No additional columns added return new String[0]; } @Override public @NonNull Type getType() { // MiddleNode type: When connected to multiple downstream nodes, // determines whether the data stream is copied or distributed. return Type.SWITCH; } } // Create Oracle data source and SQL input node DataSource dataSourceOracle = DataSourceUtil.getOracleDataSource(); String sql = "SELECT * FROM ETL_BASE.T_RESIDENT_INFO WHERE 1=1 AND ROWNUM < 50000"; SqlInputNode sqlInputNode = new SqlInputNode(dataSourceOracle, sql); // Create Postgres data source and table output node DataSource dataSourcePG = DataSourceUtil.getPostgresDataSource(); UpsertOutputNode upsertOutputNode = new UpsertOutputNode(dataSourcePG, "t_resident_info", 1000); upsertOutputNode.setIdentityMapping(Arrays.asList(new Tuple2<>("ID", "ID"))); // Create CSV file output node FileOutputNode fileOutputNode = new FileOutputNode("D:/t_resident_info_female_" + System.currentTimeMillis() +".csv", FileOutputNode.Format.CSV); // Create middle node ConditionNode conditionNode = new ConditionNode(); // Connect Oracle input node Pipe pipe = new Pipe(10000); pipe.connect(sqlInputNode, conditionNode); // Connect middle node to Postgres output node pipe = new Pipe(10000); pipe.connect(conditionNode, upsertOutputNode); // Connect middle node to CSV file output node pipe = new Pipe(10000); pipe.connect(conditionNode, fileOutputNode); // Start dataflow Dataflow dataflow = new Dataflow(sqlInputNode); dataflow.syncStart(5, TimeUnit.MINUTES);The core of Etl-engine consists of only the following three main components :
- Node The starting point, ending point, and carrier for data transformation logic.
- Pipe A non-blocking, cached queue responsible for transferring data between Nodes.
- Dataflow The orchestrator and execution entry point for the task.

