@@ -331,22 +331,11 @@ func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname stri
331
331
serviceKey := cache.ObjectName {Namespace : svc .Namespace , Name : svc .Name }.String ()
332
332
rawEndpointSlices , err := sc .endpointSlicesInformer .Informer ().GetIndexer ().ByIndex (serviceNameIndexKey , serviceKey )
333
333
if err != nil {
334
- // Should never happen as long as the index exists
335
334
log .Errorf ("Get EndpointSlices of service[%s] error:%v" , svc .GetName (), err )
336
335
return nil
337
336
}
338
337
339
- endpointSlices := make ([]* discoveryv1.EndpointSlice , 0 , len (rawEndpointSlices ))
340
- for _ , obj := range rawEndpointSlices {
341
- endpointSlice , ok := obj .(* discoveryv1.EndpointSlice )
342
- if ! ok {
343
- // Should never happen as the indexer can only contain EndpointSlice objects
344
- log .Errorf ("Expected %T but got %T instead, skipping" , endpointSlice , obj )
345
- continue
346
- }
347
- endpointSlices = append (endpointSlices , endpointSlice )
348
- }
349
-
338
+ endpointSlices := convertToEndpointSlices (rawEndpointSlices )
350
339
pods , err := sc .podInformer .Lister ().Pods (svc .Namespace ).List (selector )
351
340
if err != nil {
352
341
log .Errorf ("List Pods of service[%s] error:%v" , svc .GetName (), err )
@@ -357,74 +346,63 @@ func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname stri
357
346
publishPodIPs := endpointsType != EndpointsTypeNodeExternalIP && endpointsType != EndpointsTypeHostIP && ! sc .publishHostIP
358
347
publishNotReadyAddresses := svc .Spec .PublishNotReadyAddresses || sc .alwaysPublishNotReadyAddresses
359
348
349
+ targetsByHeadlessDomainAndType := sc .processHeadlessEndpointsFromSlices (
350
+ svc , pods , endpointSlices , hostname , endpointsType , publishPodIPs , publishNotReadyAddresses )
351
+ endpoints = buildHeadlessEndpoints (svc , targetsByHeadlessDomainAndType , ttl )
352
+ return endpoints
353
+ }
354
+
355
+ // Helper to convert raw objects to EndpointSlice
356
+ func convertToEndpointSlices (rawEndpointSlices []interface {}) []* discoveryv1.EndpointSlice {
357
+ endpointSlices := make ([]* discoveryv1.EndpointSlice , 0 , len (rawEndpointSlices ))
358
+ for _ , obj := range rawEndpointSlices {
359
+ endpointSlice , ok := obj .(* discoveryv1.EndpointSlice )
360
+ if ! ok {
361
+ log .Errorf ("Expected EndpointSlice but got %T instead, skipping" , obj )
362
+ continue
363
+ }
364
+ endpointSlices = append (endpointSlices , endpointSlice )
365
+ }
366
+ return endpointSlices
367
+ }
368
+
369
+ // processHeadlessEndpointsFromSlices processes EndpointSlices specifically for headless services
370
+ // and returns deduped targets by domain/type.
371
+ // TODO: Consider refactoring with generics when available: https://github.com/kubernetes/kubernetes/issues/133544
372
+ func (sc * serviceSource ) processHeadlessEndpointsFromSlices (
373
+ svc * v1.Service ,
374
+ pods []* v1.Pod ,
375
+ endpointSlices []* discoveryv1.EndpointSlice ,
376
+ hostname string ,
377
+ endpointsType string ,
378
+ publishPodIPs bool ,
379
+ publishNotReadyAddresses bool ,
380
+ ) map [endpoint.EndpointKey ]endpoint.Targets {
360
381
targetsByHeadlessDomainAndType := make (map [endpoint.EndpointKey ]endpoint.Targets )
361
382
for _ , endpointSlice := range endpointSlices {
362
383
for _ , ep := range endpointSlice .Endpoints {
363
384
if ! conditionToBool (ep .Conditions .Ready ) && ! publishNotReadyAddresses {
364
385
continue
365
386
}
366
-
367
- if publishPodIPs &&
368
- endpointSlice .AddressType != discoveryv1 .AddressTypeIPv4 &&
369
- endpointSlice .AddressType != discoveryv1 .AddressTypeIPv6 {
387
+ if publishPodIPs && endpointSlice .AddressType != discoveryv1 .AddressTypeIPv4 && endpointSlice .AddressType != discoveryv1 .AddressTypeIPv6 {
370
388
log .Debugf ("Skipping EndpointSlice %s/%s because its address type is unsupported: %s" , endpointSlice .Namespace , endpointSlice .Name , endpointSlice .AddressType )
371
389
continue
372
390
}
373
-
374
- // find pod for this address
375
- if ep .TargetRef == nil || ep .TargetRef .APIVersion != "" || ep .TargetRef .Kind != "Pod" {
376
- log .Debugf ("Skipping address because its target is not a pod: %v" , ep )
377
- continue
378
- }
379
- var pod * v1.Pod
380
- for _ , v := range pods {
381
- if v .Name == ep .TargetRef .Name {
382
- pod = v
383
- break
384
- }
385
- }
391
+ pod := findPodForEndpoint (ep , pods )
386
392
if pod == nil {
387
- log .Errorf ("Pod %s not found for address %v" , ep .TargetRef .Name , ep )
393
+ if ep .TargetRef != nil {
394
+ log .Errorf ("Pod %s not found for address %v" , ep .TargetRef .Name , ep )
395
+ } else {
396
+ log .Errorf ("Pod not found for endpoint with nil TargetRef: %v" , ep )
397
+ }
388
398
continue
389
399
}
390
-
391
400
headlessDomains := []string {hostname }
392
401
if pod .Spec .Hostname != "" {
393
402
headlessDomains = append (headlessDomains , fmt .Sprintf ("%s.%s" , pod .Spec .Hostname , hostname ))
394
403
}
395
-
396
404
for _ , headlessDomain := range headlessDomains {
397
- targets := annotations .TargetsFromTargetAnnotation (pod .Annotations )
398
- if len (targets ) == 0 {
399
- if endpointsType == EndpointsTypeNodeExternalIP {
400
- if sc .nodeInformer == nil {
401
- log .Warnf ("Skipping EndpointSlice %s/%s as --service-type-filter disable node informer" , endpointSlice .Namespace , endpointSlice .Name )
402
- continue
403
- }
404
- node , err := sc .nodeInformer .Lister ().Get (pod .Spec .NodeName )
405
- if err != nil {
406
- log .Errorf ("Get node[%s] of pod[%s] error: %v; not adding any NodeExternalIP endpoints" , pod .Spec .NodeName , pod .GetName (), err )
407
- return endpoints
408
- }
409
- for _ , address := range node .Status .Addresses {
410
- if address .Type == v1 .NodeExternalIP || (sc .exposeInternalIPv6 && address .Type == v1 .NodeInternalIP && suitableType (address .Address ) == endpoint .RecordTypeAAAA ) {
411
- targets = append (targets , address .Address )
412
- log .Debugf ("Generating matching endpoint %s with NodeExternalIP %s" , headlessDomain , address .Address )
413
- }
414
- }
415
- } else if endpointsType == EndpointsTypeHostIP || sc .publishHostIP {
416
- targets = endpoint.Targets {pod .Status .HostIP }
417
- log .Debugf ("Generating matching endpoint %s with HostIP %s" , headlessDomain , pod .Status .HostIP )
418
- } else {
419
- if len (ep .Addresses ) == 0 {
420
- log .Warnf ("EndpointSlice %s/%s has no addresses for endpoint %v" , endpointSlice .Namespace , endpointSlice .Name , ep )
421
- continue
422
- }
423
- address := ep .Addresses [0 ] // Only use the first address, as additional addresses have no semantic defined
424
- targets = endpoint.Targets {address }
425
- log .Debugf ("Generating matching endpoint %s with EndpointSliceAddress IP %s" , headlessDomain , address )
426
- }
427
- }
405
+ targets := sc .getTargetsForDomain (pod , ep , endpointSlice , endpointsType , headlessDomain )
428
406
for _ , target := range targets {
429
407
key := endpoint.EndpointKey {
430
408
DNSName : headlessDomain ,
@@ -435,8 +413,68 @@ func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname stri
435
413
}
436
414
}
437
415
}
416
+ // Return a copy of the map to prevent external modifications
417
+ result := make (map [endpoint.EndpointKey ]endpoint.Targets , len (targetsByHeadlessDomainAndType ))
418
+ for k , v := range targetsByHeadlessDomainAndType {
419
+ result [k ] = append (endpoint .Targets (nil ), v ... )
420
+ }
421
+ return result
422
+ }
423
+
424
+ // Helper to find pod for endpoint
425
+ func findPodForEndpoint (ep discoveryv1.Endpoint , pods []* v1.Pod ) * v1.Pod {
426
+ if ep .TargetRef == nil || ep .TargetRef .APIVersion != "" || ep .TargetRef .Kind != "Pod" {
427
+ log .Debugf ("Skipping address because its target is not a pod: %v" , ep )
428
+ return nil
429
+ }
430
+ for _ , v := range pods {
431
+ if v .Name == ep .TargetRef .Name {
432
+ return v
433
+ }
434
+ }
435
+ return nil
436
+ }
438
437
439
- headlessKeys := []endpoint.EndpointKey {}
438
+ // Helper to get targets for domain
439
+ func (sc * serviceSource ) getTargetsForDomain (pod * v1.Pod , ep discoveryv1.Endpoint , endpointSlice * discoveryv1.EndpointSlice , endpointsType string , headlessDomain string ) endpoint.Targets {
440
+ targets := annotations .TargetsFromTargetAnnotation (pod .Annotations )
441
+ if len (targets ) == 0 {
442
+ if endpointsType == EndpointsTypeNodeExternalIP {
443
+ if sc .nodeInformer == nil {
444
+ log .Warnf ("Skipping EndpointSlice %s/%s as --service-type-filter disable node informer" , endpointSlice .Namespace , endpointSlice .Name )
445
+ return nil
446
+ }
447
+ node , err := sc .nodeInformer .Lister ().Get (pod .Spec .NodeName )
448
+ if err != nil {
449
+ log .Errorf ("Get node[%s] of pod[%s] error: %v; not adding any NodeExternalIP endpoints" , pod .Spec .NodeName , pod .GetName (), err )
450
+ return nil
451
+ }
452
+ for _ , address := range node .Status .Addresses {
453
+ if address .Type == v1 .NodeExternalIP || (sc .exposeInternalIPv6 && address .Type == v1 .NodeInternalIP && suitableType (address .Address ) == endpoint .RecordTypeAAAA ) {
454
+ targets = append (targets , address .Address )
455
+ log .Debugf ("Generating matching endpoint %s with NodeExternalIP %s" , headlessDomain , address .Address )
456
+ }
457
+ }
458
+ } else if endpointsType == EndpointsTypeHostIP || sc .publishHostIP {
459
+ targets = endpoint.Targets {pod .Status .HostIP }
460
+ log .Debugf ("Generating matching endpoint %s with HostIP %s" , headlessDomain , pod .Status .HostIP )
461
+ } else {
462
+ if len (ep .Addresses ) == 0 {
463
+ log .Warnf ("EndpointSlice %s/%s has no addresses for endpoint %v" , endpointSlice .Namespace , endpointSlice .Name , ep )
464
+ return nil
465
+ }
466
+ address := ep .Addresses [0 ]
467
+ targets = endpoint.Targets {address }
468
+ log .Debugf ("Generating matching endpoint %s with EndpointSliceAddress IP %s" , headlessDomain , address )
469
+ }
470
+ }
471
+ return targets
472
+ }
473
+
474
+ // Helper to build endpoints from deduped targets
475
+ func buildHeadlessEndpoints (svc * v1.Service , targetsByHeadlessDomainAndType map [endpoint.EndpointKey ]endpoint.Targets , ttl endpoint.TTL ) []* endpoint.Endpoint {
476
+ var endpoints []* endpoint.Endpoint
477
+ headlessKeys := make ([]endpoint.EndpointKey , 0 , len (targetsByHeadlessDomainAndType ))
440
478
for headlessKey := range targetsByHeadlessDomainAndType {
441
479
headlessKeys = append (headlessKeys , headlessKey )
442
480
}
@@ -449,31 +487,26 @@ func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname stri
449
487
for _ , headlessKey := range headlessKeys {
450
488
allTargets := targetsByHeadlessDomainAndType [headlessKey ]
451
489
targets := []string {}
452
-
453
490
deduppedTargets := map [string ]struct {}{}
454
491
for _ , target := range allTargets {
455
492
if _ , ok := deduppedTargets [target ]; ok {
456
493
log .Debugf ("Removing duplicate target %s" , target )
457
494
continue
458
495
}
459
-
460
496
deduppedTargets [target ] = struct {}{}
461
497
targets = append (targets , target )
462
498
}
463
-
464
499
var ep * endpoint.Endpoint
465
500
if ttl .IsConfigured () {
466
501
ep = endpoint .NewEndpointWithTTL (headlessKey .DNSName , headlessKey .RecordType , ttl , targets ... )
467
502
} else {
468
503
ep = endpoint .NewEndpoint (headlessKey .DNSName , headlessKey .RecordType , targets ... )
469
504
}
470
-
471
505
if ep != nil {
472
506
ep .WithLabel (endpoint .ResourceLabelKey , fmt .Sprintf ("service/%s/%s" , svc .Namespace , svc .Name ))
473
507
endpoints = append (endpoints , ep )
474
508
}
475
509
}
476
-
477
510
return endpoints
478
511
}
479
512
0 commit comments