Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,8 @@ <R> R mutableSideEffect(

int getVersion(String changeId, int minSupported, int maxSupported);

int getVersion(String seriesId, String iterationId, int minSupported, int maxSupported);

void continueAsNew(ContinueAsNewInput input);

void registerQuery(RegisterQueryInput input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ public int getVersion(String changeId, int minSupported, int maxSupported) {
return next.getVersion(changeId, minSupported, maxSupported);
}

@Override
public int getVersion(String seriesId, String iterationId, int minSupported, int maxSupported) {
return next.getVersion(seriesId, iterationId, minSupported, maxSupported);
}

@Override
public void continueAsNew(ContinueAsNewInput input) {
next.continueAsNew(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ final class SyncWorkflowContext implements WorkflowContext, WorkflowOutboundCall
private Map<String, UpdateHandlerInfo> runningUpdateHandlers = new HashMap<>();
// Map of all running signal handlers. Key is the event Id of the signal event.
private Map<Long, SignalHandlerInfo> runningSignalHandlers = new HashMap<>();
// Current versions for the getVersion call that supports iterationId.
private final Map<String, Integer> currentVersions = new HashMap<>();

public SyncWorkflowContext(
@Nonnull String namespace,
Expand Down Expand Up @@ -991,6 +993,46 @@ public int getVersion(String changeId, int minSupported, int maxSupported) {
}
}

@Override
public int getVersion(String seriesId, String iterationId, int minSupported, int maxSupported) {
Integer currentVersion = currentVersions.get(seriesId);
// When replaying check if there is a marker (by calling getVersion) for each iteration.
if (isReplaying()) {
int iVersion = getVersion(seriesId + "/" + iterationId, minSupported, maxSupported);
if (currentVersion != null) {
if (iVersion < currentVersion) {
throw new IllegalArgumentException(
"getVersion for changeId '"
+ seriesId
+ "/"
+ iterationId
+ "' returned "
+ iVersion
+ " which is smaller than previously found version of "
+ currentVersion);
}
if (iVersion != DEFAULT_VERSION) {
currentVersions.put(seriesId, iVersion);
return iVersion;
}
return currentVersion;
}
return iVersion;
} else {
// When not replaying, only insert a marker (by calling getVersion) if the maxSupported is
// larger than the already recorded one.
if (currentVersion == null || (currentVersion != null && maxSupported > currentVersion)) {
int iVersion = getVersion(seriesId + "/" + iterationId, minSupported, maxSupported);
if (iVersion != maxSupported) {
throw new RuntimeException("getVersion returned wrong version: " + iVersion);
}
currentVersions.put(seriesId, iVersion);
return iVersion;
}
return currentVersion;
}
}

@Override
public void registerQuery(RegisterQueryInput request) {
queryDispatcher.registerQueryHandlers(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,13 @@ public static int getVersion(String changeId, int minSupported, int maxSupported
return getWorkflowOutboundInterceptor().getVersion(changeId, minSupported, maxSupported);
}

public static int getVersion(
String seriesId, String iterationId, int minSupported, int maxSupported) {
assertNotReadOnly("get version");
return getWorkflowOutboundInterceptor()
.getVersion(seriesId, iterationId, minSupported, maxSupported);
}

public static <V> Promise<Void> promiseAllOf(Iterable<Promise<V>> promises) {
return new AllOfPromise(promises);
}
Expand Down
86 changes: 86 additions & 0 deletions temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,92 @@ public static int getVersion(String changeId, int minSupported, int maxSupported
return WorkflowInternal.getVersion(changeId, minSupported, maxSupported);
}

/**
* Used to perform workflow safe code changes in code that is called repeatedly like body of a
* loop or a signal handler.
*
* <p>Consider the following example:
*
* <pre>
* for (int i=0; i<100; i++) {
* if (getVersion("fix1", DEFAULT_VERSION, 1) == DEFAULT_VERSION) {
* // OLD CODE
* } else {
* // NEW CODE
* }
* }
* </pre>
*
* If the change is introduced after the loop starts executing, all iterations are going to use
* the default version, even if most of them happen after the change. This happens because the *
* getVersion call returns the same version for all calls that share a changeId. The same issue *
* arises when changing code in callbacks like signal or update handlers. Frequently, there is a
* need for a new version used for newer iterations (or signal handler invocations).
*
* <p>The following solution supports updating the version of each iteration separately, as it
* uses a different changeId for each iteration:
*
* <pre>
* for (int i=0; i<100; i++) {
* if (getVersion("fix1-" + i, DEFAULT_VERSION, 1) == DEFAULT_VERSION) {
* // OLD CODE
* } else {
* // NEW CODE
* }
* }
* </pre>
*
* The drawback is a marker event as well as a search attribute update for each iteration. So, it
* is not practical for the large number of iterations.
*
* <p>This method provides an efficient alternative to the solution that uses a different changeId
* for each iteration. It only inserts a marker when a version changes.
*
* <p>Here is how it could be used:
*
* <pre>
* for (int i=0; i<100; i++) {
* if (getVersion("fix1", String.valueOf(i), DEFAULT_VERSION, 1) == DEFAULT_VERSION) {
* // OLD CODE
* } else {
* // NEW CODE
* }
* }
* </pre>
*
* Adding more branches later is OK, assuming that the series stays the same. During replay, the
* iterationId should return the same values as in the original execution.
*
* <pre>
* for (int i=0; i<100; i++) {
* int v = getVersion("fix1", String.valueOf(i), DEFAULT_VERSION, 2);
* if (v == DEFAULT_VERSION) {
* // OLD CODE
* } else if (v == 1) {
* // CODE FOR THE FIRST CHANGE
* } else {
* // CODE FOR THE LAST CHANGE
* }
* }
* </pre>
*
* All calls with the same seriesId and iterationId argument return the same value. But only if
* they follow each other. The moment a call with a different iteration is made, the version
* changes.
*
* @param seriesId identifier of a series of changes.
* @param iterationId identifier of each iteration over the changed code.
* @param minSupported min version supported for the change
* @param maxSupported max version supported for the change, this version is used as the current
* one during the original execution.
* @return {@code maxSupported} when is originally executed. Original version recorded in the
* history on replays.
*/
public static int getVersion(
String seriesId, String iterationId, int minSupported, int maxSupported) {
return WorkflowInternal.getVersion(seriesId, iterationId, minSupported, maxSupported);
}

/**
* Get scope for reporting business metrics in workflow logic. This should be used instead of
* creating new metrics scopes as it is able to dedupe metrics during replay.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.workflow.versionTests;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import io.temporal.api.enums.v1.EventType;
import io.temporal.api.history.v1.HistoryEvent;
import io.temporal.client.WorkflowStub;
import io.temporal.testing.internal.SDKTestOptions;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.worker.WorkerOptions;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.shared.TestActivities.TestActivitiesImpl;
import io.temporal.workflow.shared.TestActivities.VariousTestActivities;
import io.temporal.workflow.shared.TestWorkflows.TestWorkflow1;
import io.temporal.workflow.unsafe.WorkflowUnsafe;
import java.time.Duration;
import java.util.List;
import org.junit.Rule;
import org.junit.Test;

public class GetVersionSeriesTest {

private static boolean hasReplayed;

@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(TestGetVersionSeriesWorkflowImpl.class)
.setActivityImplementations(new TestActivitiesImpl())
// Forcing a replay. Full history arrived from a normal queue causing a replay.
.setWorkerOptions(
WorkerOptions.newBuilder()
.setStickyQueueScheduleToStartTimeout(Duration.ZERO)
.build())
.build();

@Test
public void testGetVersion() {
TestWorkflow1 workflowStub =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class);
String result = workflowStub.execute(testWorkflowRule.getTaskQueue());
assertTrue(hasReplayed);
assertEquals("foo", result);
WorkflowStub untyped = WorkflowStub.fromTyped(workflowStub);
List<HistoryEvent> markers =
testWorkflowRule.getHistoryEvents(
untyped.getExecution().getWorkflowId(), EventType.EVENT_TYPE_MARKER_RECORDED);
assertEquals(10, markers.size());
}

public static class TestGetVersionSeriesWorkflowImpl implements TestWorkflow1 {

@Override
public String execute(String taskQueue) {
VariousTestActivities testActivities =
Workflow.newActivityStub(
VariousTestActivities.class,
SDKTestOptions.newActivityOptionsForTaskQueue(taskQueue));

for (int i = 0; i < 20; i++) {
// Test adding a version check in non-replay code.
int maxSupported = i / 2 + 1;
int version =
Workflow.getVersion(
"s1", String.valueOf(maxSupported), Workflow.DEFAULT_VERSION, maxSupported);
assertEquals(version, maxSupported);
testActivities.activity2("activity2", 2);
}

// Test adding a version check in replay code.
if (WorkflowUnsafe.isReplaying()) {
hasReplayed = true;
}
// Force replay
Workflow.sleep(1000);
return "foo";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@ public class GetVersionTest {
public void testGetVersion() {
TestWorkflow1 workflowStub =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class);
String result = workflowStub.execute(testWorkflowRule.getTaskQueue());
assertTrue(hasReplayed);
assertEquals("activity22activity1activity1activity1", result);
testWorkflowRule
.getInterceptor(TracingWorkerInterceptor.class)
.setExpected(
Expand All @@ -77,6 +74,10 @@ public void testGetVersion() {
"getVersion",
"executeActivity customActivity1",
"activity customActivity1");

String result = workflowStub.execute(testWorkflowRule.getTaskQueue());
assertTrue(hasReplayed);
assertEquals("activity22activity1activity1activity1", result);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,11 @@ public int getVersion(String changeId, int minSupported, int maxSupported) {
throw new UnsupportedOperationException("not implemented");
}

@Override
public int getVersion(String seriesId, String iterationId, int minSupported, int maxSupported) {
throw new UnsupportedOperationException("not implemented");
}

@Override
public void continueAsNew(ContinueAsNewInput input) {
throw new UnsupportedOperationException("not implemented");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,14 @@ public int getVersion(String changeId, int minSupported, int maxSupported) {
return next.getVersion(changeId, minSupported, maxSupported);
}

@Override
public int getVersion(String seriesId, String iterationId, int minSupported, int maxSupported) {
if (!WorkflowUnsafe.isReplaying()) {
trace.add("getVersionSeries");
}
return next.getVersion(seriesId, iterationId, minSupported, maxSupported);
}

@Override
public void continueAsNew(ContinueAsNewInput input) {
if (!WorkflowUnsafe.isReplaying()) {
Expand Down