Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions controllers/elbv2/targetgroupbinding_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ func (r *targetGroupBindingReconciler) reconcileTargetGroupBinding(ctx context.C
if deferred {
r.deferredTargetGroupBindingReconciler.Enqueue(tgb)
return nil
} else {
r.deferredTargetGroupBindingReconciler.MarkProcessed(tgb)
}

updateTargetGroupBindingStatusFn := func() {
Expand Down
51 changes: 36 additions & 15 deletions controllers/elbv2/targetgroupbinding_deferred_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package controllers

import (
"context"
"k8s.io/apimachinery/pkg/util/cache"
"math/rand"
"sync"
"time"

"github.com/go-logr/logr"
Expand All @@ -28,39 +30,54 @@ const (

type DeferredTargetGroupBindingReconciler interface {
Enqueue(tgb *elbv2api.TargetGroupBinding)
MarkProcessed(tgb *elbv2api.TargetGroupBinding)
Run()
}

type deferredTargetGroupBindingReconcilerImpl struct {
delayQueue workqueue.DelayingInterface
syncPeriod time.Duration
k8sClient client.Client
logger logr.Logger

processedTGBCache *cache.Expiring
processedTGBCacheTTL time.Duration
processedTGBCacheMutex sync.RWMutex

delayedReconcileTime time.Duration
maxJitter time.Duration
}

func NewDeferredTargetGroupBindingReconciler(delayQueue workqueue.DelayingInterface, syncPeriod time.Duration, k8sClient client.Client, logger logr.Logger) DeferredTargetGroupBindingReconciler {
return &deferredTargetGroupBindingReconcilerImpl{
syncPeriod: syncPeriod,
logger: logger,
delayQueue: delayQueue,
k8sClient: k8sClient,
logger: logger,
delayQueue: delayQueue,
k8sClient: k8sClient,
processedTGBCache: cache.NewExpiring(),
processedTGBCacheTTL: syncPeriod,

delayedReconcileTime: defaultDelayedReconcileTime,
maxJitter: defaultMaxJitter,
}
}

// Enqueue enqueues a TGB iff it's not been processed recently.
func (d *deferredTargetGroupBindingReconcilerImpl) Enqueue(tgb *elbv2api.TargetGroupBinding) {
nsn := k8s.NamespacedName(tgb)
if d.isEligibleForDefer(tgb) {
if !d.tgbInCache(tgb) {
d.enqueue(nsn)
d.logger.Info("enqueued new deferred TGB", "tgb", nsn.Name)
}
}

// MarkProcessed updates the local cache to signify that the TGB has been processed recently and is not eligible to be deferred.
func (d *deferredTargetGroupBindingReconcilerImpl) MarkProcessed(tgb *elbv2api.TargetGroupBinding) {
if d.tgbInCache(tgb) {
return
}
d.updateCache(k8s.NamespacedName(tgb))
}

// Run starts a loop to process deferred items off the delaying queue.
func (d *deferredTargetGroupBindingReconcilerImpl) Run() {
var item interface{}
shutDown := false
Expand All @@ -87,12 +104,6 @@ func (d *deferredTargetGroupBindingReconcilerImpl) handleDeferredItem(nsn types.
return
}

// Re-check that this tgb hasn't been updated since it was enqueued
if !d.isEligibleForDefer(tgb) {
d.logger.Info("TGB not eligible for deferral", "tgb", nsn)
return
}

tgbOld := tgb.DeepCopy()
targetgroupbinding.SaveTGBReconcileCheckpoint(tgb, resetHash)

Expand All @@ -111,12 +122,16 @@ func (d *deferredTargetGroupBindingReconcilerImpl) handleDeferredItemError(nsn t
}
}

func (d *deferredTargetGroupBindingReconcilerImpl) isEligibleForDefer(tgb *elbv2api.TargetGroupBinding) bool {
then := time.Unix(targetgroupbinding.GetTGBReconcileCheckpointTimestamp(tgb), 0)
return time.Now().Sub(then) > d.syncPeriod
func (d *deferredTargetGroupBindingReconcilerImpl) tgbInCache(tgb *elbv2api.TargetGroupBinding) bool {
d.processedTGBCacheMutex.RLock()
defer d.processedTGBCacheMutex.RUnlock()

_, exists := d.processedTGBCache.Get(k8s.NamespacedName(tgb))
return exists
}

func (d *deferredTargetGroupBindingReconcilerImpl) enqueue(nsn types.NamespacedName) {
d.updateCache(nsn)
delayedTime := d.jitterReconcileTime()
d.delayQueue.AddAfter(nsn, delayedTime)
}
Expand All @@ -129,3 +144,9 @@ func (d *deferredTargetGroupBindingReconcilerImpl) jitterReconcileTime() time.Du

return d.delayedReconcileTime + time.Duration(rand.Int63n(int64(d.maxJitter)))
}

func (d *deferredTargetGroupBindingReconcilerImpl) updateCache(nsn types.NamespacedName) {
d.processedTGBCacheMutex.Lock()
defer d.processedTGBCacheMutex.Unlock()
d.processedTGBCache.Set(nsn, true, d.processedTGBCacheTTL)
}
59 changes: 22 additions & 37 deletions controllers/elbv2/targetgroupbinding_deferred_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/cache"
"k8s.io/apimachinery/pkg/util/sets"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/util/workqueue"
elbv2api "sigs.k8s.io/aws-load-balancer-controller/apis/elbv2/v1beta1"
"sigs.k8s.io/aws-load-balancer-controller/pkg/annotations"
"sigs.k8s.io/aws-load-balancer-controller/pkg/k8s"
testclient "sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/log"
"strconv"
Expand All @@ -31,15 +33,14 @@ func TestDeferredReconcilerConstructor(t *testing.T) {

deferredReconciler := d.(*deferredTargetGroupBindingReconcilerImpl)
assert.Equal(t, dq, deferredReconciler.delayQueue)
assert.Equal(t, syncPeriod, deferredReconciler.syncPeriod)
assert.Equal(t, k8sClient, deferredReconciler.k8sClient)
assert.Equal(t, logger, deferredReconciler.logger)
}

func TestDeferredReconcilerEnqueue(t *testing.T) {
syncPeriod := 5 * time.Minute
testCases := []struct {
name string
tgbInCache []*elbv2api.TargetGroupBinding
tgbToEnqueue []*elbv2api.TargetGroupBinding
expectedQueueEntries sets.Set[types.NamespacedName]
}{
Expand All @@ -59,8 +60,8 @@ func TestDeferredReconcilerEnqueue(t *testing.T) {
}),
},
{
name: "sync period too short, do not enqueue",
tgbToEnqueue: []*elbv2api.TargetGroupBinding{
name: "item in cache, no enqueue",
tgbInCache: []*elbv2api.TargetGroupBinding{
{
ObjectMeta: metav1.ObjectMeta{
Name: "tgb1",
Expand All @@ -71,25 +72,18 @@ func TestDeferredReconcilerEnqueue(t *testing.T) {
},
},
},
expectedQueueEntries: make(sets.Set[types.NamespacedName]),
},
{
name: "sync period too long, do enqueue",
tgbToEnqueue: []*elbv2api.TargetGroupBinding{
{
ObjectMeta: metav1.ObjectMeta{
Name: "tgb1",
Namespace: "ns",
Annotations: map[string]string{
annotations.AnnotationCheckPointTimestamp: strconv.FormatInt(time.Now().Add(-2*syncPeriod).Unix(), 10),
annotations.AnnotationCheckPointTimestamp: strconv.FormatInt(time.Now().Unix(), 10),
},
},
},
},
expectedQueueEntries: sets.New(types.NamespacedName{
Name: "tgb1",
Namespace: "ns",
}),
expectedQueueEntries: make(sets.Set[types.NamespacedName]),
},
{
name: "multiple tgb",
Expand Down Expand Up @@ -196,15 +190,20 @@ func TestDeferredReconcilerEnqueue(t *testing.T) {
Build()

impl := deferredTargetGroupBindingReconcilerImpl{
delayQueue: dq,
syncPeriod: syncPeriod,
k8sClient: k8sClient,
logger: logr.New(&log.NullLogSink{}),
delayQueue: dq,
k8sClient: k8sClient,
logger: logr.New(&log.NullLogSink{}),
processedTGBCache: cache.NewExpiring(),
processedTGBCacheTTL: 1 * time.Minute,

delayedReconcileTime: 0 * time.Millisecond,
maxJitter: 0 * time.Millisecond,
}

for _, tgb := range tc.tgbInCache {
impl.processedTGBCache.Set(k8s.NamespacedName(tgb), true, time.Minute*1)
}

for _, tgb := range tc.tgbToEnqueue {
impl.Enqueue(tgb)
}
Expand Down Expand Up @@ -261,10 +260,12 @@ func TestDeferredReconcilerRun(t *testing.T) {

impl := deferredTargetGroupBindingReconcilerImpl{
delayQueue: dq,
syncPeriod: 5 * time.Minute,
k8sClient: k8sClient,
logger: logr.New(&log.NullLogSink{}),

processedTGBCache: cache.NewExpiring(),
processedTGBCacheTTL: 1 * time.Minute,

delayedReconcileTime: 0 * time.Millisecond,
maxJitter: 0 * time.Millisecond,
}
Expand Down Expand Up @@ -292,24 +293,6 @@ func TestHandleDeferredItem(t *testing.T) {
Namespace: "ns",
},
},
{
name: "not eligible",
nsn: types.NamespacedName{
Name: "name",
Namespace: "ns",
},
storedTGB: &elbv2api.TargetGroupBinding{
ObjectMeta: metav1.ObjectMeta{
Name: "name",
Namespace: "ns",
Annotations: map[string]string{
annotations.AnnotationCheckPoint: "foo",
annotations.AnnotationCheckPointTimestamp: strconv.FormatInt(time.Now().Unix(), 10),
},
},
},
expectedCheckPoint: aws.String("foo"),
},
{
name: "eligible",
nsn: types.NamespacedName{
Expand Down Expand Up @@ -355,10 +338,12 @@ func TestHandleDeferredItem(t *testing.T) {

impl := deferredTargetGroupBindingReconcilerImpl{
delayQueue: dq,
syncPeriod: syncPeriod,
k8sClient: k8sClient,
logger: logr.New(&log.NullLogSink{}),

processedTGBCache: cache.NewExpiring(),
processedTGBCacheTTL: 1 * time.Minute,

delayedReconcileTime: 0 * time.Millisecond,
maxJitter: 0 * time.Millisecond,
}
Expand Down
Loading