Skip to content
18 changes: 17 additions & 1 deletion internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log/slog"
"strings"
"sync/atomic"
"time"

"github.com/github/deployment-tracker/internal/metadata"
Expand Down Expand Up @@ -47,7 +48,8 @@ type ttlCache interface {
}

type deploymentRecordPoster interface {
PostOne(ctx context.Context, record *deploymentrecord.DeploymentRecord) error
PostOne(ctx context.Context, record *deploymentrecord.Record) error
PostCluster(ctx context.Context, records []*deploymentrecord.Record, cluster string) ([]byte, error)
}

type podMetadataAggregator interface {
Expand Down Expand Up @@ -87,6 +89,8 @@ type Controller struct {
// informerSyncTimeout is the maximum time allowed for all informers to sync
// and prevents sync from hanging indefinitely.
informerSyncTimeout time.Duration
// syncing tracks if the kubernetes informers have finished syncing
syncing atomic.Bool
}

// New creates a new deployment tracker controller.
Expand Down Expand Up @@ -147,10 +151,16 @@ func New(clientset kubernetes.Interface, metadataAggregator podMetadataAggregato
unknownArtifacts: amcache.NewExpiring(),
informerSyncTimeout: informerSyncTimeoutDuration,
}
cntrl.syncing.Store(true)

// Add event handlers to the informer
_, err = podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj any) {
// Skip adding sync events
if cntrl.syncing.Load() {
return
}
Comment on lines 158 to +162

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these events should still be captured. I will adjust the PR description.


pod, ok := obj.(*corev1.Pod)
if !ok {
slog.Error("Invalid object returned",
Expand Down Expand Up @@ -314,6 +324,12 @@ func (c *Controller) Run(ctx context.Context, workers int) error {
return errors.New("timed out waiting for caches to sync - please ensure deployment tracker has the correct kubernetes permissions")
}
}
c.syncing.Store(false)
syncClusterPods := c.podInformer.GetIndexer().List()
err := c.processSyncEvents(ctx, syncClusterPods)
if err != nil {
return fmt.Errorf("sync events failed: %w", err)
}

slog.Info("Starting workers",
"count", workers,
Expand Down
10 changes: 7 additions & 3 deletions internal/controller/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,23 @@ import (

type mockRecordPoster struct {
mu sync.Mutex
records []*deploymentrecord.DeploymentRecord
records []*deploymentrecord.Record
err error // to simulate failures
}

func (m *mockRecordPoster) PostOne(_ context.Context, record *deploymentrecord.DeploymentRecord) error {
func (m *mockRecordPoster) PostOne(_ context.Context, record *deploymentrecord.Record) error {
m.mu.Lock()
defer m.mu.Unlock()
m.records = append(m.records, record)
return m.err
}

func (m *mockRecordPoster) PostCluster(_ context.Context, _ []*deploymentrecord.Record, _ string) ([]byte, error) {
return nil, nil
}

// Helper that allows tests to read captured records safely.
func (m *mockRecordPoster) getRecords() []*deploymentrecord.DeploymentRecord {
func (m *mockRecordPoster) getRecords() []*deploymentrecord.Record {
m.mu.Lock()
defer m.mu.Unlock()
return slices.Clone(m.records)
Expand Down
45 changes: 37 additions & 8 deletions internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/github/deployment-tracker/internal/metadata"
"github.com/github/deployment-tracker/internal/workload"
"github.com/github/deployment-tracker/pkg/deploymentrecord"
"github.com/stretchr/testify/assert"
Expand All @@ -23,35 +24,62 @@ import (

// mockPoster records all PostOne calls and returns a configurable error.
type mockPoster struct {
mu sync.Mutex
calls int
lastErr error
mu sync.Mutex
calls int
clusterCalls int
clusterRecordCount int
lastErr error
clusterResp []byte
clusterErr error
}

func (m *mockPoster) PostOne(_ context.Context, _ *deploymentrecord.DeploymentRecord) error {
func (m *mockPoster) PostOne(_ context.Context, _ *deploymentrecord.Record) error {
m.mu.Lock()
defer m.mu.Unlock()
m.calls++
return m.lastErr
}

func (m *mockPoster) getCalls() int {
func (m *mockPoster) PostCluster(_ context.Context, records []*deploymentrecord.Record, _ string) ([]byte, error) {
m.mu.Lock()
defer m.mu.Unlock()
m.clusterCalls++
m.clusterRecordCount = len(records)
return m.clusterResp, m.clusterErr
}

func (m *mockPoster) getPostOneCalls() int {
m.mu.Lock()
defer m.mu.Unlock()
return m.calls
}

func (m *mockPoster) getPostClusterCalls() int {
m.mu.Lock()
defer m.mu.Unlock()
return m.clusterCalls
}

// mockResolver is a test double for the workloadResolver interface.
type mockResolver struct{}
type mockResolver struct {
name string
}

func (*mockResolver) Resolve(_ *corev1.Pod) workload.Identity {
return workload.Identity{}
func (m *mockResolver) Resolve(_ *corev1.Pod) workload.Identity {
return workload.Identity{Name: m.name}
}

func (*mockResolver) IsActive(_ string, _ workload.Identity) bool {
return false
}

// mockMetadataAggregator is a test double for the podMetadataAggregator interface.
type mockMetadataAggregator struct{}

func (*mockMetadataAggregator) BuildAggregatePodMetadata(_ context.Context, _ *metav1.PartialObjectMetadata) *metadata.AggregatePodMetadata {
return nil
}

// newTestController creates a minimal Controller suitable for unit-testing
// recordContainer without a real Kubernetes cluster.
func newTestController(poster *mockPoster) *Controller {
Expand All @@ -64,6 +92,7 @@ func newTestController(poster *mockPoster) *Controller {
Cluster: "test",
},
workloadResolver: &mockResolver{},
metadataAggregator: &mockMetadataAggregator{},
observedDeployments: amcache.NewExpiring(),
unknownArtifacts: amcache.NewExpiring(),
}
Expand Down
Loading