- Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Search before asking
- I had searched in the issues and found no similar issues.
Description
1. Background & Objectives
1.1 Business Scenarios
In Doris Cloud mode, users require periodic snapshot backups of cluster metadata to support:
- Disaster Recovery: Restore FE metadata to a specific point in time when corruption occurs
- Instance Cloning: Clone read-only/writable/rollback instances from a snapshot for testing and data analysis
- Compliance Auditing: Meet data backup SLA requirements for industries such as finance and healthcare
1.2 Framework Status
Doris already provides a complete snapshot framework skeleton, but all business methods are Not implemented stubs:
| Layer | Component | Status |
|---|---|---|
| C++ Meta-Service | SnapshotManager base class | 18 virtual methods, all returning UNDEFINED_ERR or no-op |
| C++ Meta-Service | MetaServiceImpl RPC delegation | 7 RPC handlers with parameter validation + delegation logic implemented |
| Java FE | CloudSnapshotHandler base class | submitJob / refreshAutoSnapshotJob / cloneSnapshot all throw NotImplementedException |
| Java FE | SQL Commands | AdminCreate/Set/Drop ClusterSnapshot — 3 commands completed |
| Proto | cloud.proto | All RPC message definitions ready (7 RPCs + 14 messages) |
| TxnKv Keys | keys.h | snapshot_full_key / snapshot_reference_key defined |
| RPC Channel | MetaServiceProxy / MetaServiceClient | 7 snapshot methods wrapped |
| BE Display | SchemaClusterSnapshotsScanner | information_schema.cluster_snapshots implemented |
| Config | Config.java | 6 cloud_snapshot_* / multi_part_upload_* config items defined |
| Tests | meta_service_snapshot_test.cpp | Only BeginSnapshotTest exists (DISABLED) |
1.3 Design Goals
- G-1: Implement all 18 virtual methods of
SnapshotManager, covering the complete snapshot lifecycle - G-2: Implement a
CloudSnapshotHandlersubclass, completing FE-side scheduling, upload, and recovery logic - G-3: Do not modify any existing Proto definitions or RPC signatures — develop entirely based on framework extension points
- G-4: All core flows covered by unit tests + integration tests, with 100% executable test cases
- G-5: Support S3/OSS/GCS/HDFS multi-storage backends via the
StorageVaultAccessorabstraction layer
2. Technical Infrastructure Analysis
2.1 Framework Extension Point Verification
Extension Point 1: C++ SnapshotManager (Meta-Service Side)
Verified through source code — all business methods in the base class default to returning UNDEFINED_ERR:
// cloud/src/snapshot/snapshot_manager.cpp (L72-L77) void SnapshotManager::begin_snapshot(std::string_view instance_id, const BeginSnapshotRequest& request, BeginSnapshotResponse* response) { response->mutable_status()->set_code(MetaServiceCode::UNDEFINED_ERR); response->mutable_status()->set_msg("Not implemented"); }Injection point confirmed: meta_server.cpp L82-83 and recycler.cpp L594 are the only locations where SnapshotManager is instantiated. Customization only requires replacing these two locations:
// cloud/src/meta-service/meta_server.cpp (L82-83) auto snapshot_mgr = std::make_shared<SnapshotManager>(txn_kv_); auto meta_service = std::make_unique<MetaServiceImpl>(txn_kv_, rc_mgr, rate_limiter, std::move(snapshot_mgr)); // cloud/src/recycler/recycler.cpp (L594) snapshot_manager_ = std::make_shared<SnapshotManager>(txn_kv_);Extension Point 2: Java CloudSnapshotHandler (FE Scheduling Side)
Dynamically loaded via reflection, with the class name controlled by Config.cloud_snapshot_handler_class:
// CloudSnapshotHandler.java (L42-53) public static CloudSnapshotHandler getInstance() { Class<CloudSnapshotHandler> theClass = (Class<CloudSnapshotHandler>) Class.forName( Config.cloud_snapshot_handler_class); Constructor<CloudSnapshotHandler> constructor = theClass.getDeclaredConstructor(); return constructor.newInstance(); }CloudEnv calls the handler's initialize() during initialize(), and starts the background daemon in startMasterOnlyDaemonThreads().
2.2 TxnKv Key Format Confirmation
// keys.h (L110-111) 0x03 "snapshot" ${instance_id} "full" ${timestamp} -> SnapshotPB 0x03 "snapshot" ${instance_id} "reference" ${timestamp} ${instance_id} -> ${empty_value} - Full Key: Stores snapshot metadata
SnapshotPB, with Versionstamp as the timestamp suffix - Reference Key: Records instance IDs derived from this snapshot, used for reference counting and deletion protection
Existing utility functions:
// keys.h (L539-545) versioned::snapshot_full_key({instance_id}) versioned::snapshot_reference_key({instance_id, versionstamp, ref_instance_id}) versioned::snapshot_reference_key_prefix(instance_id) versioned::snapshot_key_prefix(instance_id)2.3 Snapshot State Machine Confirmation
// cloud.proto (L821-830) enum SnapshotStatus { SNAPSHOT_PREPARE = 0; // Creating SNAPSHOT_NORMAL = 1; // Ready SNAPSHOT_ABORTED = 2; // Aborted SNAPSHOT_RECYCLED = 3; // Marked for recycling }State transitions: PREPARE → NORMAL (success) | PREPARE → ABORTED (failure) | NORMAL → RECYCLED (deletion/expiration)
2.4 Existing RPC Parameter Validation Logic
MetaServiceImpl has already completed unified cloud_unique_id → instance_id conversion and rate limiting. Subclass implementations do not need to repeat this:
// meta_service_snapshot.cpp (L27-49) — using begin_snapshot as an example void MetaServiceImpl::begin_snapshot(...) { RPC_PREPROCESS(begin_snapshot, get, put, del); // cloud_unique_id validation instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id()); RPC_RATE_LIMIT(begin_snapshot); // Delegate to SnapshotManager snapshot_manager_->begin_snapshot(instance_id, *request, response); }2.5 Object Storage Access Interface Confirmation
StorageVaultAccessor provides a complete storage operation abstraction:
// storage_vault_accessor.h (L51-97) class StorageVaultAccessor { virtual int delete_prefix(const std::string& path_prefix, int64_t expiration_time = 0) = 0; virtual int delete_directory(const std::string& dir_path) = 0; virtual int delete_file(const std::string& path) = 0; virtual int list_directory(const std::string& dir_path, std::unique_ptr<ListIterator>* res) = 0; virtual int put_file(const std::string& path, const std::string& content) = 0; virtual int exists(const std::string& path) = 0; virtual int abort_multipart_upload(const std::string& path, const std::string& upload_id) = 0; };The abort_multipart_upload method is built-in and can be used directly to clean up multipart uploads during abort_snapshot.
2.6 FE Configuration Items Status
| Config Item | Default Value | Description |
|---|---|---|
cloud_snapshot_handler_class | "...CloudSnapshotHandler" | Handler implementation class name |
cloud_snapshot_handler_interval_second | 3600 | Background scheduling interval (seconds) |
cloud_snapshot_timeout_seconds | 600 | PREPARE state timeout (seconds) |
cloud_auto_snapshot_max_reversed_num | 35 | Maximum retained auto-snapshots |
cloud_auto_snapshot_min_interval_seconds | 3600 | Minimum auto-snapshot interval (seconds) |
multi_part_upload_part_size_in_bytes | 256MB | Multipart upload chunk size |
multi_part_upload_max_seconds | 3600 | Multipart upload timeout (seconds) |
multi_part_upload_pool_size | 10 | Multipart upload thread pool size |
3. Technical Constraints & Design Principles
3.1 Hard Constraints
| ID | Constraint | Impact |
|---|---|---|
| C-1 | SnapshotManager base class txn_kv_ is private | Subclass must receive TxnKv reference via constructor, or base class must be changed to protected |
| C-2 | Versionstamp depends on FDB transaction commit | begin_snapshot must use atomic versionstamp operations within an FDB transaction |
| C-3 | CloudSnapshotHandler loaded via reflection | Subclass must have a no-argument constructor |
| C-4 | MetaServiceImpl RPC preprocessing already completes parameter validation | SnapshotManager methods do not need to re-validate cloud_unique_id |
| C-5 | Proto message definitions cannot be modified | All field semantics are finalized; no fields may be added or removed |
| C-6 | MasterDaemon.runOneCycle() waits for Catalog readiness | runAfterCatalogReady() executes after FE image loading is complete |
| C-7 | snapshot_full_key uses encode_versioned_key encoding | Range scans must use snapshot_key_prefix + range queries |
3.2 Design Trade-offs
| Trade-off | Decision | Rationale |
|---|---|---|
| Subclass vs. direct base class modification | Inherit via subclass | Aligns with framework design intent; does not affect other consumers |
| FE Image upload: synchronous vs. asynchronous | Asynchronous thread pool | Large Image uploads may take tens of minutes; cannot block the main thread |
| Snapshot concurrency: parallel vs. serial | Serial (single-thread pool) | Avoids consistency issues from concurrent checkpoints |
| Versionstamp generation: client vs. server | Server-side (FDB atomic op) | Ensures globally monotonic ordering; avoids clock drift |
| Auto-snapshot expiration: scheduled scan vs. event-driven | Scheduled scan | Reuses MasterDaemon scheduling framework; simple and reliable |
| Clone types: READ_ONLY only vs. all | Implement all | Proto already defines READ_ONLY/WRITABLE/ROLLBACK; full support required |
4. Core Implementation Plan
4.1 Layered Implementation Strategy
graph TB subgraph "Phase 1: C++ SnapshotManager Core Implementation" A1["DorisSnapshotManager extends SnapshotManager"] A2["begin_snapshot: validation → build SnapshotPB → FDB atomic write"] A3["update_snapshot: update upload_file/upload_id"] A4["commit_snapshot: PREPARE→NORMAL + write finish_at/sizes"] A5["abort_snapshot: →ABORTED + abort multipart upload"] A6["drop_snapshot: →RECYCLED"] A7["list_snapshot: range scan + SnapshotInfoPB conversion"] A8["clone_instance: metadata clone + reference recording"] end subgraph "Phase 2: C++ Operational Capabilities" B1["recycle_snapshots: TTL + max_reserved cleanup"] B2["recycle_snapshot_meta_and_data: object store + KV dual deletion"] B3["check_snapshots / inverted_check: consistency verification"] B4["set_multi_version_status: MVCC state management"] B5["migrate_to_versioned_keys: key migration"] B6["compact_snapshot_chains: snapshot chain compression"] end subgraph "Phase 3: Java CloudSnapshotHandler Implementation" C1["DorisCloudSnapshotHandler extends CloudSnapshotHandler"] C2["initialize(): load persisted state + init S3 client"] C3["runAfterCatalogReady(): auto-snapshot trigger check"] C4["submitJob(ttl, label): five-step workflow"] C5["refreshAutoSnapshotJob(): reload scheduling config"] C6["cloneSnapshot(file): restore FE from snapshot"] end subgraph "Phase 4: Testing + Integration" D1["C++ unit tests: cover all SnapshotManager methods"] D2["Java unit tests: mock MetaServiceProxy"] D3["Integration tests: end-to-end SQL command verification"] D4["Regression tests: regression-test/suites"] end A1 --> A2 --> A3 --> A4 --> A5 --> A6 --> A7 --> A8 A8 --> B1 --> B2 --> B3 --> B4 --> B5 --> B6 B6 --> C1 --> C2 --> C3 --> C4 --> C5 --> C6 C6 --> D1 --> D2 --> D3 --> D4 4.2 Snapshot Lifecycle Core Workflow
sequenceDiagram participant User as User/Scheduler participant Handler as DorisCloudSnapshotHandler participant Proxy as MetaServiceProxy participant SnapMgr as DorisSnapshotManager participant FDB as FoundationDB participant ObjStore as Object Storage User->>Handler: submitJob(ttl, label) Handler->>Proxy: beginSnapshot(cloud_unique_id, label, timeout, ttl) Proxy->>SnapMgr: begin_snapshot(instance_id, req, resp) SnapMgr->>FDB: atomic write SnapshotPB{PREPARE} + get versionstamp SnapMgr-->>Handler: {snapshot_id, image_url, obj_info} Handler->>Handler: Catalog.checkpoint() to get last_journal_id Handler->>ObjStore: multipart upload FE Image to image_url loop Each part Handler->>Proxy: updateSnapshot(snapshot_id, upload_file, upload_id) Proxy->>SnapMgr: update_snapshot() SnapMgr->>FDB: update SnapshotPB.upload_file/upload_id end Handler->>Proxy: commitSnapshot(snapshot_id, image_url, journal_id, sizes) Proxy->>SnapMgr: commit_snapshot() SnapMgr->>FDB: update SnapshotPB{NORMAL, finish_at} Note over Handler,ObjStore: Error path Handler->>Proxy: abortSnapshot(snapshot_id, reason) Proxy->>SnapMgr: abort_snapshot() SnapMgr->>FDB: update SnapshotPB{ABORTED} SnapMgr->>ObjStore: abort_multipart_upload(path, upload_id) 5. C++ DorisSnapshotManager Method-Level Design
5.1 Class Definition
File: cloud/src/snapshot/doris_snapshot_manager.h
class DorisSnapshotManager : public SnapshotManager { public: DorisSnapshotManager(std::shared_ptr<TxnKv> txn_kv); ~DorisSnapshotManager() override = default; void begin_snapshot(std::string_view instance_id, const BeginSnapshotRequest& request, BeginSnapshotResponse* response) override; void update_snapshot(std::string_view instance_id, const UpdateSnapshotRequest& request, UpdateSnapshotResponse* response) override; void commit_snapshot(std::string_view instance_id, const CommitSnapshotRequest& request, CommitSnapshotResponse* response) override; void abort_snapshot(std::string_view instance_id, const AbortSnapshotRequest& request, AbortSnapshotResponse* response) override; void drop_snapshot(std::string_view instance_id, const DropSnapshotRequest& request, DropSnapshotResponse* response) override; void list_snapshot(std::string_view instance_id, const ListSnapshotRequest& request, ListSnapshotResponse* response) override; void clone_instance(const CloneInstanceRequest& request, CloneInstanceResponse* response) override; std::pair<MetaServiceCode, std::string> set_multi_version_status( std::string_view instance_id, MultiVersionStatus multi_version_status) override; int check_snapshots(InstanceChecker* checker) override; int inverted_check_snapshots(InstanceChecker* checker) override; int check_mvcc_meta_key(InstanceChecker* checker) override; int inverted_check_mvcc_meta_key(InstanceChecker* checker) override; int check_meta(MetaChecker* meta_checker) override; int recycle_snapshots(InstanceRecycler* recycler) override; int recycle_snapshot_meta_and_data(std::string_view instance_id, std::string_view resource_id, StorageVaultAccessor* accessor, Versionstamp snapshot_version, const SnapshotPB& snapshot_pb) override; int migrate_to_versioned_keys(InstanceDataMigrator* migrator) override; int compact_snapshot_chains(InstanceChainCompactor* compactor) override; private: std::shared_ptr<TxnKv> txn_kv_; // Independently held reference // Internal helper methods int read_snapshot_pb(std::string_view instance_id, const Versionstamp& vs, SnapshotPB* pb); int write_snapshot_pb(std::string_view instance_id, const Versionstamp& vs, const SnapshotPB& pb); int read_instance_info(std::string_view instance_id, InstanceInfoPB* info); int get_storage_vault_accessor(const InstanceInfoPB& instance, const std::string& resource_id, std::unique_ptr<StorageVaultAccessor>* accessor); bool validate_ip_address(const std::string& ip); std::string build_image_url(const InstanceInfoPB& instance, const std::string& snapshot_id); };5.2 Method Implementation Specifications
| Method | Key Input Fields | Core Implementation Logic | Key Return Fields | Error Codes |
|---|---|---|---|---|
begin_snapshot | snapshot_label, timeout_seconds, ttl_seconds, auto_snapshot, request_ip | 1. Validate parameters (timeout>0, ttl>0, label non-empty); 2. Optionally validate IP format (IPv4/IPv6); 3. Read InstanceInfoPB to obtain obj_info or storage_vault config; 4. Use FDB atomic versionstamp operation to write SnapshotPB{PREPARE, create_at=now()}; 5. Construct image_url = <prefix>/snapshot/<snapshot_id>/image/ | snapshot_id, image_url, obj_info | INVALID_ARGUMENT / ALREADY_EXISTED |
update_snapshot | snapshot_id, upload_file, upload_id | 1. Parse snapshot_id to Versionstamp; 2. Read SnapshotPB, verify status==PREPARE; 3. Update upload_file / upload_id; 4. Write back to TxnKv | status only | INVALID_ARGUMENT / SNAPSHOT_NOT_FOUND |
commit_snapshot | snapshot_id, image_url, last_journal_id, snapshot_meta_image_size, snapshot_logical_data_size | 1. Read SnapshotPB, verify status==PREPARE; 2. Timeout check now < create_at + timeout_seconds; 3. Update status to NORMAL, write finish_at, last_journal_id, sizes; 4. Write back to TxnKv | status only | SNAPSHOT_EXPIRED / SNAPSHOT_NOT_FOUND |
abort_snapshot | snapshot_id, reason | 1. Read SnapshotPB; 2. If upload_file/upload_id exist, abort multipart upload via StorageVaultAccessor; 3. Status→ABORTED, write reason; 4. Write back to TxnKv | status only | SNAPSHOT_NOT_FOUND |
drop_snapshot | snapshot_id | 1. Read SnapshotPB; 2. Only allow deletion of NORMAL/ABORTED snapshots; 3. Check if reference keys exist (if derived instances exist, only mark RECYCLED; otherwise can clean immediately) | status only | INVALID_ARGUMENT / SNAPSHOT_NOT_FOUND |
list_snapshot | required_snapshot_id, include_aborted, instance_id | 1. Range scan TxnKv with snapshot_key_prefix(instance_id) as prefix; 2. Deserialize each SnapshotPB to SnapshotInfoPB; 3. Query reference keys to populate derived_instance_ids; 4. Filter by include_aborted; 5. Sort by creation time descending | repeated SnapshotInfoPB | — |
clone_instance | from_snapshot_id, from_instance_id, new_instance_id, clone_type | 1. Read source snapshot SnapshotPB (verify NORMAL); 2. Read source instance InstanceInfoPB; 3. Construct new instance metadata by clone_type; 4. Write snapshot_reference_key to record derivation; 5. Write new instance InstanceInfoPB | obj_info, image_url | INVALID_ARGUMENT / SNAPSHOT_NOT_FOUND |
recycle_snapshots | InstanceRecycler* | Scan all SnapshotPBs: a) TTL expired (now > create_at + ttl_seconds) → mark RECYCLED; b) status==RECYCLED → call recycle_snapshot_meta_and_data(); c) NORMAL count > max_reserved → mark oldest as RECYCLED | 0/non-zero | — |
recycle_snapshot_meta_and_data | instance_id, resource_id, StorageVaultAccessor*, Versionstamp, SnapshotPB | 1. accessor->delete_directory(image_url) to delete object storage files; 2. Delete snapshot_full_key from TxnKv; 3. Delete all associated snapshot_reference_keys | 0/non-zero | — |
5.3 begin_snapshot Key Implementation (Simulated Walkthrough)
Input: instance_id="inst_001", label="daily_backup", timeout=3600, ttl=86400, auto=true Step 1: Validation - timeout_seconds=3600 > 0 ✓ - ttl_seconds=86400 > 0 ✓ - label="daily_backup" non-empty ✓ Step 2: Read Instance Info - key: instance_key({instance_id}) → InstanceInfoPB - Obtain obj_info[0] = {bucket="my-bucket", prefix="doris-data", endpoint="s3.cn-north.amazonaws.com"} - resource_id = obj_info[0].id Step 3: FDB Atomic Write - snapshot_full_key = versioned::snapshot_full_key({instance_id}) - Construct SnapshotPB: status = SNAPSHOT_PREPARE type = SNAPSHOT_REFERENCE instance_id = "inst_001" create_at = 1742313600 (current UNIX timestamp) timeout_seconds = 3600 ttl_seconds = 86400 auto = true label = "daily_backup" resource_id = obj_info[0].id - txn->atomic_set_versionstamped_key(snapshot_full_key, SnapshotPB) - txn->commit() → obtain versionstamp Step 4: Construct Response - snapshot_id = serialize_snapshot_id(versionstamp) → "0a1b2c3d4e5f6789a0b1" - image_url = "doris-data/snapshot/0a1b2c3d4e5f6789a0b1/image/" - obj_info = instance's ObjectStoreInfoPB Result: response = {snapshot_id, image_url, obj_info} ✓ 5.4 clone_instance Three Clone Types Implementation
| Field | READ_ONLY | WRITABLE | ROLLBACK |
|---|---|---|---|
ready_only | true | false | false |
source_instance_id | Source instance ID | Source instance ID | Source instance ID |
source_snapshot_id | Snapshot ID | Snapshot ID | Snapshot ID |
original_instance_id | — | — | Earliest source instance ID |
successor_instance_id | — | — | Previous inherited instance ID |
obj_info | Inherited from source | Uses new obj_info from request | Inherited from source |
storage_vault | Inherited from source | Uses new vault from request | Inherited from source |
| Data visibility | Read-only source data | Independent copy | Rolled back to snapshot point |
6. Java DorisCloudSnapshotHandler Detailed Design
6.1 Class Definition
File: fe/fe-core/src/main/java/org/apache/doris/cloud/snapshot/DorisCloudSnapshotHandler.java
public class DorisCloudSnapshotHandler extends CloudSnapshotHandler { private ExecutorService snapshotExecutor; // Single-thread serial execution private volatile SnapshotState currentSnapshot; // Currently in-progress snapshot @Override public void initialize() { snapshotExecutor = Executors.newSingleThreadExecutor( new ThreadFactoryBuilder().setNameFormat("snapshot-worker-%d").setDaemon(true).build()); // Load FE persisted snapshot state (if any) } @Override protected void runAfterCatalogReady() { /* Auto-snapshot trigger logic */ } @Override public void submitJob(long ttl, String label) throws Exception { /* Manual snapshot submission */ } @Override public synchronized void refreshAutoSnapshotJob() throws Exception { /* Reload auto-snapshot config */ } @Override public void cloneSnapshot(String clusterSnapshotFile) throws Exception { /* Restore from snapshot */ } }6.2 Method Implementation Specifications
| Method | Core Implementation Logic |
|---|---|
initialize() | 1. Create single-thread snapshotExecutor; 2. Initialize object storage client (based on AK/SK/Endpoint from obj_info); 3. Load FE persisted SnapshotState (if an in-progress snapshot exists, check whether abort is needed) |
runAfterCatalogReady() | 1. Execute only on Master (Env.isMaster() check); 2. Call MetaServiceProxy.getInstance().getSnapshotProperties() to get switch_status / max_reserved / interval; 3. Skip if switch_status != SNAPSHOT_SWITCH_ON; 4. Call listSnapshot(false) to get NORMAL list; 5. Trigger if now - latest snapshot time >= snapshot_interval_seconds; 6. Abort PREPARE snapshots exceeding cloud_snapshot_timeout_seconds |
submitJob(ttl, label) | 1. Verify no in-progress PREPARE snapshot; 2. Submit async task to snapshotExecutor; 3. Async task executes five-step workflow: beginSnapshot → checkpoint → multipartUpload → updateSnapshot → commitSnapshot; 4. Call abortSnapshot on any step failure |
refreshAutoSnapshotJob() | 1. Read latest auto-snapshot configuration from MetaService; 2. If scheduling interval changed, call setInterval() to update MasterDaemon.intervalMs |
cloneSnapshot(file) | 1. Parse clusterSnapshotFile to obtain snapshot_id; 2. Call MetaServiceProxy.cloneInstance(); 3. Download FE Image locally from obj_info + image_url; 4. FE framework automatically rebuilds metadata from the Image |
6.3 FE Image Upload Five-Step Workflow
Step 1: beginSnapshot(cloud_unique_id, label, timeout=600, ttl) → Obtain snapshot_id, image_url, obj_info Step 2: Catalog.checkpoint() → Obtain last_journal_id, image file path Step 3: S3 Multipart Upload Initialization → Create S3Client based on obj_info → initiateMultipartUpload(image_url + "image.xxx") → Obtain upload_id Step 4: updateSnapshot(snapshot_id, upload_file, upload_id) → Record upload info for fault recovery Step 5: Chunked Upload → Split by multi_part_upload_part_size_in_bytes (256MB) → Parallel upload (pool size = multi_part_upload_pool_size) → completeMultipartUpload Step 6: commitSnapshot(snapshot_id, image_url, last_journal_id, meta_size, data_size) → Snapshot complete 6.4 Auto-Snapshot Trigger Decision Flow
flowchart TD A["runAfterCatalogReady() triggered every interval seconds"] --> B{Env.isMaster?} B -- No --> Z["Skip"] B -- Yes --> C["getSnapshotProperties()"] C --> D{switch_status == ON?} D -- No --> Z D -- Yes --> E["listSnapshot(false) get NORMAL list"] E --> F{NORMAL list empty?} F -- Yes --> G["Trigger snapshot immediately"] F -- No --> H{now - latest.finish_at >= interval?} H -- No --> I["Check PREPARE timeout"] H -- Yes --> J{NORMAL count >= max_reserved?} J -- No --> G J -- Yes --> K["Recycle oldest NORMAL snapshot"] K --> G I --> L{PREPARE timed out?} L -- Yes --> M["abortSnapshot + trigger new snapshot"] L -- No --> Z G --> N["submitJob(auto_ttl, auto_label)"] 6.5 Configuration Injection
The FE side needs to update the cloud_snapshot_handler_class default value in Config.java (or configure via fe.conf):
# fe.conf cloud_snapshot_handler_class = org.apache.doris.cloud.snapshot.DorisCloudSnapshotHandler7. Recycling & Cleanup Mechanism
7.1 Recycling Trigger Conditions
| Condition | Judgment Logic | Action |
|---|---|---|
| TTL expired (manual snapshot) | now > snapshot.create_at + snapshot.ttl_seconds | Mark RECYCLED |
| Exceeds max retention (auto snapshot) | NORMAL count > instance.max_reserved_snapshot | Mark oldest NORMAL as RECYCLED |
| Already marked RECYCLED | snapshot.status == SNAPSHOT_RECYCLED | Execute actual cleanup |
| ABORTED state | snapshot.status == SNAPSHOT_ABORTED | Clean up residual uploads + delete KV |
| PREPARE timeout | now > snapshot.create_at + snapshot.timeout_seconds | Mark ABORTED |
7.2 Cleanup Workflow
flowchart TD A["Recycler background instance scan"] --> B["InstanceRecycler.recycle()"] B --> C["DorisSnapshotManager.recycle_snapshots(recycler)"] C --> D["Range scan TxnKv: snapshot_key_prefix(instance_id)"] D --> E{Iterate each SnapshotPB} E --> F{TTL expired or PREPARE timeout?} F -- Yes --> G["Mark status as RECYCLED/ABORTED"] F -- No --> H{Status == RECYCLED or ABORTED?} H -- Yes --> I["recycle_snapshot_meta_and_data()"] I --> J["accessor->delete_directory(image_url)"] I --> K["Delete snapshot_full_key"] I --> L["Delete associated snapshot_reference_keys"] H -- No --> M{NORMAL count > max_reserved?} M -- Yes --> N["Mark oldest NORMAL as RECYCLED"] N --> I M -- No --> O["Skip"] E --> P{Continue to next?} P -- Yes --> E P -- No --> Q["Done"] 8. Consistency Verification Design
8.1 Forward Verification (check_snapshots)
Goal: Ensure all NORMAL-state SnapshotPBs in TxnKv have corresponding files in object storage.
for each SnapshotPB where status == NORMAL: if accessor->exists(snapshot.image_url) != 0: LOG(ERROR) << "Snapshot " << snapshot_id << " image not found at " << image_url error_count++ 8.2 Inverse Verification (inverted_check_snapshots)
Goal: Ensure all files under the snapshot path in object storage have corresponding SnapshotPB metadata.
accessor->list_directory("snapshot/") for each file in listing: extract snapshot_id from path if read_snapshot_pb(instance_id, parse_versionstamp(snapshot_id)) fails: LOG(ERROR) << "Orphan snapshot file: " << file.path orphan_count++ 8.3 MVCC Metadata Verification (check_mvcc_meta_key / inverted_check_mvcc_meta_key)
Verifies consistency of multi-version keys (0x03 keyspace) for partition/tablet/rowset metadata across the snapshot chain.
9. Injection Point Modification Checklist
9.1 meta_server.cpp Modification
// Before: auto snapshot_mgr = std::make_shared<SnapshotManager>(txn_kv_); // After: auto snapshot_mgr = std::make_shared<DorisSnapshotManager>(txn_kv_);9.2 recycler.cpp Modification
// Before: snapshot_manager_ = std::make_shared<SnapshotManager>(txn_kv_); // After: snapshot_manager_ = std::make_shared<DorisSnapshotManager>(txn_kv_);9.3 CMakeLists.txt Modification
Add new source file in cloud/CMakeLists.txt:
set(SNAPSHOT_SRCS src/snapshot/snapshot_manager.cpp src/snapshot/doris_snapshot_manager.cpp # New )10. File Change Manifest
10.1 New C++ Files
| File | Description |
|---|---|
cloud/src/snapshot/doris_snapshot_manager.h | DorisSnapshotManager class definition |
cloud/src/snapshot/doris_snapshot_manager.cpp | All 18 method implementations |
cloud/test/doris_snapshot_manager_test.cpp | Complete unit tests |
10.2 Modified C++ Files
| File | Modification |
|---|---|
cloud/src/meta-service/meta_server.cpp | L82: SnapshotManager → DorisSnapshotManager |
cloud/src/recycler/recycler.cpp | L594: SnapshotManager → DorisSnapshotManager |
cloud/CMakeLists.txt | Add doris_snapshot_manager.cpp to build list |
10.3 New Java Files
| File | Description |
|---|---|
fe/fe-core/src/main/java/org/apache/doris/cloud/snapshot/DorisCloudSnapshotHandler.java | Handler implementation |
fe/fe-core/src/test/java/org/apache/doris/cloud/snapshot/DorisCloudSnapshotHandlerTest.java | Unit tests |
10.4 Modified Java Files
| File | Modification |
|---|---|
fe/fe-common/src/main/java/org/apache/doris/common/Config.java | Update cloud_snapshot_handler_class default value |
10.5 Test Files
| File | Description |
|---|---|
cloud/test/doris_snapshot_manager_test.cpp | C++ unit tests (all methods) |
cloud/test/meta_service_snapshot_test.cpp | Enable and extend existing DISABLED tests |
regression-test/suites/cloud_p0/snapshot/ | Groovy integration tests |
Use case
No response
Related issues
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct