Skip to content

Commit 3f46277

Browse files
committed
Pipelines: Add created_date and modified_date
1 parent 0b06d9f commit 3f46277

File tree

15 files changed

+415
-21
lines changed

15 files changed

+415
-21
lines changed

docs/changelog/130847.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
pr: 130847
2+
summary: "Pipelines: Add `created_date` and `modified_date`"
3+
area: Ingest Node
4+
type: enhancement
5+
issues: [108754]
6+
highlight:
7+
title: "Pipelines: Add `created_date` and `modified_date`"
8+
body: |-
9+
Pipelines now have extra properties to indicate when they were created, and updated, to improve visibility
10+
and debugging.
11+
These properties are managed by the system and cannot be updated by the user.
12+
notable: true

qa/mixed-cluster/build.gradle

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,12 @@ excludeList.add('aggregations/percentiles_hdr_metric/Negative values test')
7171
// sync_id is removed in 9.0
7272
excludeList.add("cat.shards/10_basic/Help")
7373

74+
// new optional properties only available on latest cluster
75+
excludeList.add("ingest/10_basic/Test creating and getting pipeline returns created_date and modified_date")
76+
excludeList.add("ingest/10_basic/Test PUT setting created_date")
77+
excludeList.add("ingest/10_basic/Test PUT setting modified_date")
78+
excludeList.add("simulate.ingest/10_basic/Test simulate with pipeline with created_date")
79+
7480
def clusterPath = getPath()
7581

7682
buildParams.bwcVersions.withWireCompatible { bwcVersion, baseName ->

rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/ingest/10_basic.yml

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,3 +184,49 @@
184184
- is_false: first_pipeline.description
185185
- is_true: second_pipeline
186186
- is_false: second_pipeline.description
187+
188+
189+
---
190+
"Test creating and getting pipeline returns created_date and modified_date":
191+
- do:
192+
ingest.put_pipeline:
193+
id: "my_pipeline"
194+
body: >
195+
{
196+
"processors": []
197+
}
198+
- match: { acknowledged: true }
199+
200+
- do:
201+
ingest.get_pipeline:
202+
id: "my_pipeline"
203+
- match: { my_pipeline.created_date: "/\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z/" }
204+
- match: { my_pipeline.modified_date: "/\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z/" }
205+
206+
---
207+
"Test PUT setting created_date":
208+
- do:
209+
catch: bad_request
210+
ingest.put_pipeline:
211+
id: "my_pipeline"
212+
body: >
213+
{
214+
"processors": [],
215+
"created_date": "2025-07-04T12:50:48.415Z"
216+
}
217+
- match: { status: 400 }
218+
- match: { error.reason: "Provided a pipeline property which is managed by the system, e.g. `created_date`." }
219+
220+
---
221+
"Test PUT setting modified_date":
222+
- do:
223+
catch: bad_request
224+
ingest.put_pipeline:
225+
id: "my_pipeline"
226+
body: >
227+
{
228+
"processors": [],
229+
"modified_date": "2025-07-04T12:50:48.415Z"
230+
}
231+
- match: { status: 400 }
232+
- match: { error.reason: "Provided a pipeline property which is managed by the system, e.g. `created_date`." }

rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/simulate.ingest/10_basic.yml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -748,3 +748,33 @@ setup:
748748
- match: { docs.1.doc._index: "index-2" }
749749
- match: { docs.1.doc._source.foo: "rab" }
750750
- match: { docs.1.doc.executed_pipelines: ["my-pipeline"] }
751+
752+
---
753+
"Test simulate with pipeline with created_date":
754+
- skip:
755+
features: headers
756+
- do:
757+
catch: request
758+
headers:
759+
Content-Type: application/json
760+
simulate.ingest:
761+
pipeline: "my_pipeline"
762+
body: >
763+
{
764+
"docs": [
765+
{
766+
"_index": "index-1",
767+
"_source": {
768+
"foo": "bar"
769+
}
770+
}
771+
],
772+
"pipeline_substitutions": {
773+
"my_pipeline": {
774+
"processors": [],
775+
"created_date": "asd"
776+
}
777+
}
778+
}
779+
- match: { status: 500 }
780+
- match: { error.reason: "/(runtime_exception:\\ )?org\\.elasticsearch\\.ElasticsearchParseException:\\ Provided\\ a\\ pipeline\\ property\\ which\\ is\\ managed\\ by\\ the\\ system,\\ e\\.g\\.\\ `created_date`\\./" }
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.ingest;
11+
12+
import org.elasticsearch.action.ingest.GetPipelineResponse;
13+
import org.elasticsearch.cluster.metadata.Metadata;
14+
import org.elasticsearch.node.NodeService;
15+
import org.elasticsearch.test.ESIntegTestCase;
16+
17+
import static org.hamcrest.Matchers.equalTo;
18+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
19+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
20+
21+
@ESIntegTestCase.ClusterScope(numDataNodes = 3)
22+
public class PipelineConfigurationSyncIT extends ESIntegTestCase {
23+
24+
public void testAllNodesGetPipelineTrackingClusterState() throws Exception {
25+
final String pipelineId = "_id";
26+
GetPipelineResponse getResponse = getPipelines(pipelineId);
27+
assertFalse(getResponse.isFound());
28+
29+
final long timeBeforePut = System.currentTimeMillis();
30+
putJsonPipeline(
31+
"_id",
32+
(builder, params) -> builder.field("description", "my_pipeline").startArray("processors").startObject().endObject().endArray()
33+
);
34+
final Long timeAfterPut = System.currentTimeMillis();
35+
36+
getResponse = getPipelines(pipelineId);
37+
assertTrue(getResponse.isFound());
38+
39+
final Pipeline node1Pipeline = internalCluster().getInstance(NodeService.class, "node_s0")
40+
.getIngestService()
41+
.getPipeline(Metadata.DEFAULT_PROJECT_ID, "_id");
42+
final Pipeline node2Pipeline = internalCluster().getInstance(NodeService.class, "node_s1")
43+
.getIngestService()
44+
.getPipeline(Metadata.DEFAULT_PROJECT_ID, "_id");
45+
final Pipeline node3Pipeline = internalCluster().getInstance(NodeService.class, "node_s2")
46+
.getIngestService()
47+
.getPipeline(Metadata.DEFAULT_PROJECT_ID, "_id");
48+
49+
assertNotSame(node1Pipeline, node2Pipeline);
50+
assertNotSame(node2Pipeline, node3Pipeline);
51+
52+
assertThat(node1Pipeline.getDescription(), equalTo(node2Pipeline.getDescription()));
53+
assertThat(node2Pipeline.getDescription(), equalTo(node3Pipeline.getDescription()));
54+
55+
// created_date
56+
final long node1CreatedDate = node1Pipeline.getCreatedDate().orElseThrow();
57+
final long node2CreatedDate = node2Pipeline.getCreatedDate().orElseThrow();
58+
final long node3CreatedDate = node3Pipeline.getCreatedDate().orElseThrow();
59+
60+
assertThat(node1CreatedDate, equalTo(node2CreatedDate));
61+
assertThat(node2CreatedDate, equalTo(node3CreatedDate));
62+
63+
assertThat(node1CreatedDate, greaterThanOrEqualTo(timeBeforePut));
64+
assertThat(node1CreatedDate, lessThanOrEqualTo(timeAfterPut));
65+
66+
// modified_date
67+
final long node1ModifiedDate = node1Pipeline.getModifiedDate().orElseThrow();
68+
final long node2ModifiedDate = node2Pipeline.getModifiedDate().orElseThrow();
69+
final long node3ModifiedDate = node3Pipeline.getModifiedDate().orElseThrow();
70+
71+
assertThat(node1ModifiedDate, equalTo(node2ModifiedDate));
72+
assertThat(node2ModifiedDate, equalTo(node3ModifiedDate));
73+
74+
assertThat(node1ModifiedDate, greaterThanOrEqualTo(timeBeforePut));
75+
assertThat(node1ModifiedDate, lessThanOrEqualTo(timeAfterPut));
76+
}
77+
}

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,7 @@ static TransportVersion def(int id) {
338338
public static final TransportVersion ESQL_FIXED_INDEX_LIKE = def(9_119_0_00);
339339
public static final TransportVersion LOOKUP_JOIN_CCS = def(9_120_0_00);
340340
public static final TransportVersion NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO = def(9_121_0_00);
341+
public static final TransportVersion PIPELINE_TRACKING_INFO = def(9_122_0_00);
341342

342343
/*
343344
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/ingest/ReservedPipelineAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public TransformState transform(ProjectId projectId, List<PutPipelineRequest> so
8787
ProjectMetadata projectMetadata = clusterState.metadata().getProject(projectId);
8888

8989
for (var request : requests) {
90-
var nopUpdate = IngestService.isNoOpPipelineUpdate(projectMetadata, request);
90+
var nopUpdate = IngestService.isNoOpPipelineUpdate(projectMetadata, request, () -> IngestService.readPipelineConfig(request));
9191

9292
if (nopUpdate) {
9393
continue;

server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,18 @@ static void executeDocument(
4242
if (verbose) {
4343
List<SimulateProcessorResult> processorResultList = new CopyOnWriteArrayList<>();
4444
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), null, processorResultList);
45+
long createdDate = pipeline.getCreatedDate().orElse(-1);
46+
long modifiedDate = pipeline.getModifiedDate().orElse(-1);
4547
Pipeline verbosePipeline = new Pipeline(
4648
pipeline.getId(),
4749
pipeline.getDescription(),
4850
pipeline.getVersion(),
4951
pipeline.getMetadata(),
5052
verbosePipelineProcessor,
5153
pipeline.getFieldAccessPattern(),
52-
pipeline.getDeprecated()
54+
pipeline.getDeprecated(),
55+
createdDate == -1 ? null : createdDate,
56+
modifiedDate == -1 ? null : modifiedDate
5357
);
5458
ingestDocument.executePipeline(verbosePipeline, (result, e) -> {
5559
handler.accept(new SimulateDocumentVerboseResult(processorResultList), e);

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@
8181
import org.elasticsearch.xcontent.XContentBuilder;
8282

8383
import java.io.IOException;
84+
import java.time.Instant;
85+
import java.time.temporal.ChronoUnit;
8486
import java.util.ArrayList;
8587
import java.util.Collection;
8688
import java.util.Collections;
@@ -102,6 +104,7 @@
102104
import java.util.function.Function;
103105
import java.util.function.IntConsumer;
104106
import java.util.function.Predicate;
107+
import java.util.function.Supplier;
105108
import java.util.stream.Collectors;
106109

107110
import static org.elasticsearch.core.Strings.format;
@@ -542,7 +545,9 @@ public void putPipeline(
542545
ActionListener<AcknowledgedResponse> listener,
543546
Consumer<ActionListener<NodesInfoResponse>> nodeInfoListener
544547
) throws Exception {
545-
if (isNoOpPipelineUpdate(state.metadata().getProject(projectId), request)) {
548+
Map<String, Object> newPipelineConfig = readPipelineConfig(request);
549+
validateNoSystemPropertiesInPipelineConfig(newPipelineConfig);
550+
if (isNoOpPipelineUpdate(state.metadata().getProject(projectId), request, () -> newPipelineConfig)) {
546551
// existing pipeline matches request pipeline -- no need to update
547552
listener.onResponse(AcknowledgedResponse.TRUE);
548553
return;
@@ -569,16 +574,35 @@ public void validatePipelineRequest(ProjectId projectId, PutPipelineRequest requ
569574
validatePipeline(ingestInfos, projectId, request.getId(), config);
570575
}
571576

572-
public static boolean isNoOpPipelineUpdate(ProjectMetadata metadata, PutPipelineRequest request) {
577+
public static Map<String, Object> readPipelineConfig(PutPipelineRequest request) {
578+
return XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
579+
}
580+
581+
public static void validateNoSystemPropertiesInPipelineConfig(final Map<String, Object> pipelineConfig) {
582+
if (pipelineConfig.containsKey(Pipeline.CREATED_DATE_KEY) || pipelineConfig.containsKey(Pipeline.MODIFIED_DATE_KEY)) {
583+
throw new ElasticsearchParseException("Provided a pipeline property which is managed by the system, e.g. `created_date`.");
584+
}
585+
}
586+
587+
public static boolean isNoOpPipelineUpdate(
588+
ProjectMetadata metadata,
589+
PutPipelineRequest request,
590+
Supplier<Map<String, Object>> newPipelineConfigSupplier
591+
) {
573592
IngestMetadata currentIngestMetadata = metadata.custom(IngestMetadata.TYPE);
574593
if (request.getVersion() == null
575594
&& currentIngestMetadata != null
576595
&& currentIngestMetadata.getPipelines().containsKey(request.getId())) {
577-
var pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
578-
var currentPipeline = currentIngestMetadata.getPipelines().get(request.getId());
579-
if (currentPipeline.getConfig().equals(pipelineConfig)) {
580-
return true;
581-
}
596+
597+
Map<String, Object> currentConfigWithoutSystemProps = new HashMap<>(
598+
currentIngestMetadata.getPipelines().get(request.getId()).getConfig()
599+
);
600+
currentConfigWithoutSystemProps.remove(Pipeline.CREATED_DATE_KEY);
601+
currentConfigWithoutSystemProps.remove(Pipeline.MODIFIED_DATE_KEY);
602+
603+
Map<String, Object> newPipelineConfig = newPipelineConfigSupplier.get();
604+
605+
return newPipelineConfig.equals(currentConfigWithoutSystemProps);
582606
}
583607

584608
return false;
@@ -750,7 +774,22 @@ public IngestMetadata execute(IngestMetadata currentIngestMetadata, Collection<I
750774
pipelines = new HashMap<>();
751775
}
752776

753-
pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), pipelineSource, request.getXContentType()));
777+
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
778+
Map<String, Object> newPipelineConfig = XContentHelper.convertToMap(pipelineSource, true, request.getXContentType()).v2();
779+
PipelineConfiguration existingPipeline = pipelines.get(request.getId());
780+
if (existingPipeline == null) {
781+
newPipelineConfig.put(Pipeline.CREATED_DATE_KEY, now.toString());
782+
} else {
783+
Object existingCreatedAt = existingPipeline.getConfig().get(Pipeline.CREATED_DATE_KEY);
784+
// only set/carry over `created_date` if existing pipeline already has it.
785+
// would be confusing if existing pipelines were all updated to have `created_date` set to now.
786+
if (existingCreatedAt != null) {
787+
newPipelineConfig.put(Pipeline.CREATED_DATE_KEY, existingCreatedAt);
788+
}
789+
}
790+
newPipelineConfig.put(Pipeline.MODIFIED_DATE_KEY, now.toString());
791+
792+
pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), newPipelineConfig));
754793
return new IngestMetadata(pipelines);
755794
}
756795
}

0 commit comments

Comments
 (0)