Skip to content

Commit d1d689b

Browse files
jswxstwisubasinghe
authored andcommitted
fix: process aggregate outputs for steps node with retries. Fixes #14647 (#14651)
Signed-off-by: oninowang <[email protected]>
1 parent e0a0f28 commit d1d689b

File tree

4 files changed

+53
-6
lines changed

4 files changed

+53
-6
lines changed
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
apiVersion: argoproj.io/v1alpha1
2+
kind: Workflow
3+
metadata:
4+
name: parameter-aggregation-steps-with-retry
5+
spec:
6+
retryStrategy:
7+
limit: 1
8+
entrypoint: fanout-steps-with-output
9+
templates:
10+
- name: echo-value
11+
inputs:
12+
parameters:
13+
- name: message
14+
container:
15+
image: argoproj/argosay:v2
16+
outputs:
17+
parameters:
18+
- name: dummy-output
19+
value: '{{inputs.parameters.message}}'
20+
- name: fanout-steps-with-output
21+
steps:
22+
- - name: echo-list
23+
template: echo-value
24+
arguments:
25+
parameters:
26+
- name: message
27+
value: '{{item}}'
28+
withItems: [1, 2, 3]
29+
outputs:
30+
parameters:
31+
- name: dummy-steps-output
32+
valueFrom:
33+
parameter: '{{steps.echo-list.outputs.parameters.dummy-output}}'

test/e2e/functional_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,24 @@ func (s *FunctionalSuite) TestParameterAggregationDAGWithRetry() {
599599
})
600600
}
601601

602+
func (s *FunctionalSuite) TestParameterAggregationStepsWithRetry() {
603+
s.Given().
604+
Workflow("@functional/parameter-aggregation-steps-with-retry.yaml").
605+
When().
606+
SubmitWorkflow().
607+
WaitForWorkflow(time.Second * 90).
608+
Then().
609+
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
610+
assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase)
611+
nodeStatus := status.Nodes.FindByDisplayName("parameter-aggregation-steps-with-retry(0)")
612+
require.NotNil(t, nodeStatus)
613+
assert.Equal(t, wfv1.NodeSucceeded, nodeStatus.Phase)
614+
require.NotNil(t, nodeStatus.Outputs)
615+
assert.Len(t, nodeStatus.Outputs.Parameters, 1)
616+
assert.Equal(t, `["1","2","3"]`, nodeStatus.Outputs.Parameters[0].Value.String())
617+
})
618+
}
619+
602620
func (s *FunctionalSuite) TestDAGDepends() {
603621
s.Given().
604622
Workflow("@functional/dag-depends.yaml").

workflow/controller/dag.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -670,10 +670,6 @@ func (woc *wfOperationCtx) buildLocalScopeFromTask(dagCtx *dagContext, task *wfv
670670
var ancestorNodes []wfv1.NodeStatus
671671
for _, node := range woc.wf.Status.Nodes {
672672
if node.BoundaryID == dagCtx.boundaryID && strings.HasPrefix(node.Name, ancestorNode.Name+"(") {
673-
// Filter retried nodes and only aggregate outputs of their parent nodes.
674-
if node.NodeFlag != nil && node.NodeFlag.Retried {
675-
continue
676-
}
677673
ancestorNodes = append(ancestorNodes, node)
678674
}
679675
}

workflow/controller/operator.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3316,10 +3316,10 @@ func (woc *wfOperationCtx) processAggregateNodeOutputs(scope *wfScope, prefix st
33163316
if len(childNodes) == 0 {
33173317
return nil
33183318
}
3319-
// Some of the children may be hooks, only keep those that aren't
3319+
// Some of the children may be hooks and some of the children may be retried nodes, only keep those that aren't
33203320
nodeIdx := 0
33213321
for i := range childNodes {
3322-
if childNodes[i].NodeFlag == nil || !childNodes[i].NodeFlag.Hooked {
3322+
if childNodes[i].NodeFlag == nil || (!childNodes[i].NodeFlag.Hooked && !childNodes[i].NodeFlag.Retried) {
33233323
childNodes[nodeIdx] = childNodes[i]
33243324
nodeIdx++
33253325
}

0 commit comments

Comments
 (0)