Skip to content
Merged
6 changes: 6 additions & 0 deletions cmd/committee-api/design/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,12 @@ var CommitteeInviteWithReadonlyAttributes = dsl.Type("committee-invite-with-read
dsl.Example("7cad5a8d-19d0-41a4-81a6-043453daf9ee")
dsl.Format(dsl.FormatUUID)
})
dsl.Attribute("committee_name", dsl.String, "Name of the committee at the time the invite was created", func() {
dsl.Example("Technical Steering Committee")
})
dsl.Attribute("organization_required", dsl.Boolean, "Whether the invitee must supply an organization when accepting. True when the committee has voting enabled or requires a business email.", func() {
dsl.Example(false)
})
dsl.Attribute("invitee_email", dsl.String, "Email of the invited person", func() {
dsl.Format(dsl.FormatEmail)
dsl.Example("invitee@example.com")
Expand Down
56 changes: 50 additions & 6 deletions cmd/committee-api/service/committee_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,8 @@ func (s *committeeServicesrvc) GetInvite(ctx context.Context, p *committeeservic
return nil, wrapError(ctx, errors.NewNotFound("invite not found in this committee"))
}

s.enrichInviteFromCommittee(ctx, invite, p.UID)

return s.convertInviteDomainToResponse(invite), nil
Comment thread
andrest50 marked this conversation as resolved.
}

Expand All @@ -657,6 +659,14 @@ func (s *committeeServicesrvc) CreateInvite(ctx context.Context, p *committeeser
return nil, wrapError(ctx, err)
}

// Best-effort: settings drive organization_required; missing settings means false.
committeeSettings, _, settingsErr := s.storage.GetSettings(ctx, p.UID)
if settingsErr != nil {
slog.WarnContext(ctx, "CreateInvite: failed to get committee settings for organization_required",
"committee_uid", p.UID, "error", settingsErr)
}
orgRequired := committeeBase.EnableVoting || (committeeSettings != nil && committeeSettings.BusinessEmailRequired)

var inviteOrgID, inviteOrgName, inviteOrgWebsite *string
if p.Organization != nil {
inviteOrgID = p.Organization.ID
Expand All @@ -666,12 +676,14 @@ func (s *committeeServicesrvc) CreateInvite(ctx context.Context, p *committeeser
inviteOrganization := organizationPtrFromFields(inviteOrgID, inviteOrgName, inviteOrgWebsite)

invite := &model.CommitteeInvite{
UID: uuid.New().String(),
CommitteeUID: p.UID,
InviteeEmail: p.InviteeEmail,
Organization: inviteOrganization,
Status: "pending",
CreatedAt: time.Now().UTC(),
UID: uuid.New().String(),
CommitteeUID: p.UID,
CommitteeName: committeeBase.Name,
OrganizationRequired: orgRequired,
InviteeEmail: p.InviteeEmail,
Organization: inviteOrganization,
Status: "pending",
CreatedAt: time.Now().UTC(),
Comment thread
andrest50 marked this conversation as resolved.
}
if p.Role != nil {
invite.Role = *p.Role
Expand Down Expand Up @@ -707,6 +719,8 @@ func (s *committeeServicesrvc) CreateInvite(ctx context.Context, p *committeeser
return nil, wrapError(ctx, errGet)
}
revokedInvite.Status = "pending"
revokedInvite.CommitteeName = committeeBase.Name
revokedInvite.OrganizationRequired = orgRequired
if p.Role != nil {
revokedInvite.Role = *p.Role
}
Expand Down Expand Up @@ -855,6 +869,7 @@ func (s *committeeServicesrvc) RevokeInvite(ctx context.Context, p *committeeser
}

invite.Status = "revoked"
s.enrichInviteFromCommittee(ctx, invite, p.UID)
if err := s.storage.UpdateInvite(ctx, invite, rev); err != nil {
return wrapError(ctx, err)
}
Expand Down Expand Up @@ -923,6 +938,7 @@ func (s *committeeServicesrvc) AcceptInvite(ctx context.Context, p *committeeser

// Member created successfully — now mark the invite accepted.
invite.Status = "accepted"
s.enrichInviteFromCommittee(ctx, invite, p.UID)
if err := s.storage.UpdateInvite(ctx, invite, rev); err != nil {
return nil, wrapError(ctx, err)
}
Expand Down Expand Up @@ -966,6 +982,7 @@ func (s *committeeServicesrvc) DeclineInvite(ctx context.Context, p *committeese
}

invite.Status = "declined"
s.enrichInviteFromCommittee(ctx, invite, p.UID)
if err := s.storage.UpdateInvite(ctx, invite, rev); err != nil {
return nil, wrapError(ctx, err)
}
Expand Down Expand Up @@ -1311,6 +1328,33 @@ func (s *committeeServicesrvc) resolveCallerEmail(ctx context.Context) (string,
return userEmails.PrimaryEmail, nil
}

// enrichInviteFromCommittee populates invite fields derived from the committee.
// It sets CommitteeName when missing and refreshes OrganizationRequired from the
// committee's current settings (voting enabled or business email required).
// Best-effort: a GetBase failure leaves the invite fully unchanged. A GetSettings
// failure leaves OrganizationRequired unchanged (CommitteeName may already have
// been backfilled). All errors are logged.
func (s *committeeServicesrvc) enrichInviteFromCommittee(ctx context.Context, invite *model.CommitteeInvite, committeeUID string) {
cb, _, err := s.storage.GetBase(ctx, committeeUID)
if err != nil {
slog.WarnContext(ctx, "enrichInviteFromCommittee: failed to get committee base",
"committee_uid", committeeUID, "error", err)
return
}
if invite.CommitteeName == "" {
invite.CommitteeName = cb.Name
}
settings, _, settingsErr := s.storage.GetSettings(ctx, committeeUID)
if settingsErr != nil {
// Leave OrganizationRequired unchanged on a transient settings failure rather than
// clobbering a correctly-stored value with one derived from nil settings.
slog.WarnContext(ctx, "enrichInviteFromCommittee: failed to get committee settings",
"committee_uid", committeeUID, "error", settingsErr)
return
}
invite.OrganizationRequired = cb.EnableVoting || settings.BusinessEmailRequired
}

// publishInviteIndexerMessage publishes an indexer message for invite operations.
// Publishing is best-effort: failures are logged but do not fail the request.
// IndexingConfig is required because the indexer is data-agnostic; publishers supply all indexing metadata.
Expand Down
4 changes: 4 additions & 0 deletions cmd/committee-api/service/committee_service_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,10 @@ func (s *committeeServicesrvc) convertInviteDomainToResponse(invite *model.Commi
InviteeEmail: &invite.InviteeEmail,
Status: invite.Status,
}
if invite.CommitteeName != "" {
result.CommitteeName = &invite.CommitteeName
}
result.OrganizationRequired = &invite.OrganizationRequired
if invite.Role != "" {
result.Role = &invite.Role
}
Expand Down
41 changes: 41 additions & 0 deletions cmd/committee-api/service/committee_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,11 +681,50 @@ func TestGetInvite(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, result)
assert.Equal(t, tt.payload.InviteUID, *result.UID)
assert.Equal(t, "Technical Advisory Committee", *result.CommitteeName)
assert.True(t, *result.OrganizationRequired)
}
})
}
}

func TestGetInvite_SettingsFailurePreservesOrganizationRequired(t *testing.T) {
// When GetSettings fails (committee has no settings), enrichInviteFromCommittee must
// leave the existing OrganizationRequired value intact rather than clobbering it with
// a value derived from nil settings (which would incorrectly evaluate to false).
svc, _, repo := setupServiceTestWithRepo()

// Add a committee with no settings — GetSettings will return NotFound.
repo.AddCommittee(&model.Committee{
CommitteeBase: model.CommitteeBase{
UID: "no-settings-committee",
ProjectUID: "proj-1",
Name: "No Settings Committee",
EnableVoting: false,
},
CommitteeSettings: nil,
})
// Seed an invite with OrganizationRequired already set to true.
repo.AddCommitteeInvite(&model.CommitteeInvite{
UID: "invite-no-settings",
CommitteeUID: "no-settings-committee",
InviteeEmail: "test@example.com",
Status: "pending",
OrganizationRequired: true,
})

result, err := svc.GetInvite(context.Background(), &committeeservice.GetInvitePayload{
UID: "no-settings-committee",
InviteUID: "invite-no-settings",
})

require.NoError(t, err)
require.NotNil(t, result)
// OrganizationRequired must be preserved despite the settings failure.
require.NotNil(t, result.OrganizationRequired)
assert.True(t, *result.OrganizationRequired, "settings failure must not clobber a correctly-stored OrganizationRequired=true")
}

func TestCreateInvite(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -732,6 +771,8 @@ func TestCreateInvite(t *testing.T) {
assert.Equal(t, tt.payload.UID, *result.CommitteeUID)
assert.Equal(t, tt.payload.InviteeEmail, *result.InviteeEmail)
assert.Equal(t, "pending", result.Status)
assert.Equal(t, "Technical Advisory Committee", *result.CommitteeName)
assert.True(t, *result.OrganizationRequired)

require.Len(t, sender.calls, 1)
call := sender.calls[0]
Expand Down
38 changes: 38 additions & 0 deletions cmd/committee-cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,44 @@ NATS_URL=nats://localhost:4222 \
committee-cli sync members-by-committee-index --committee-uid=abc-123
```

#### `sync reindex-invites`

Re-publishes all committee invites from NATS KV to OpenSearch (via the indexer) and OpenFGA (via fga-sync). For each invite, the command fetches the parent committee's base and settings, backfills `committee_name` (if missing) and recomputes `organization_required` (`enable_voting || business_email_required`), persists any changed fields back to NATS KV, then publishes the updated invite to the indexer and access-control subjects.

Committee base and settings are fetched once per unique committee UID and cached for the duration of the run. If the committee fetch fails the invite is published as-is without modifying its stored fields.

**Subcommand flags**

| Flag | Default | Description |
|---|---|---|
| `--committee-uid` | `""` | Limit reindex to invites of a single committee |
| `--sleep` | `0` | Wait between each invite publish (e.g. `200ms`, `1s`) |
| `--dry-run` | `false` | Log what would be published and updated without writing |

**Exit code:** `0` if no invites failed, `1` otherwise.

**Output:** Structured JSON log line on completion with fields `total`, `updated`, `skipped`, `failed`, `duration_ms`, `rate_per_sec`.

**Examples**

Dry-run to preview scope (safe first step):
```sh
NATS_URL=nats://localhost:4222 LOG_LEVEL=info \
committee-cli sync reindex-invites --dry-run
```

Full reindex with a 200ms pause between invites:
```sh
NATS_URL=nats://localhost:4222 \
committee-cli sync reindex-invites --sleep=200ms
```

Reindex a single committee's invites:
```sh
NATS_URL=nats://localhost:4222 \
committee-cli sync reindex-invites --committee-uid=abc-123
```

## Building

### Local binary
Expand Down
3 changes: 3 additions & 0 deletions cmd/committee-cli/commands/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ type RunContext struct {
// (e.g. IndexMemberByCommittee). This is used by data-repair subcommands that bypass the
// business-logic orchestrator and write to the storage layer directly.
CommitteeMemberWriter port.CommitteeMemberWriter
// CommitteeInviteWriter provides direct storage-layer access to invite write operations
// (e.g. backfilling fields during reindex). Bypasses the business-logic orchestrator.
CommitteeInviteWriter port.CommitteeInviteWriter
// Publisher provides direct access to indexer and access-control messaging (e.g. reindex
// subcommands that need to publish without going through the writer orchestrator).
Publisher port.CommitteePublisher
Expand Down
119 changes: 105 additions & 14 deletions cmd/committee-cli/commands/sync/reindex_invites.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ func (s *reindexInvitesSubcommand) Run(ctx context.Context, rc commands.RunConte
if rc.Publisher == nil {
return errs.NewUnexpected("publisher is not wired in RunContext")
}
if rc.CommitteeInviteWriter == nil {
return errs.NewUnexpected("CommitteeInviteWriter is not wired in RunContext")
}

ctx = context.WithValue(ctx, constants.AuthorizationContextID, "Bearer lfx-v2-committee-service")

Expand All @@ -75,12 +78,69 @@ func (s *reindexInvitesSubcommand) Run(ctx context.Context, rc commands.RunConte
stats.Total = len(invites)
stats.DryRun = rc.DryRun

// Cache per-committee derived fields to avoid redundant NATS KV reads during batch reindex.
// fetched=false means GetBase failed; in that case no fields on the invite are modified.
type committeeSnapshot struct {
name string
organizationRequired bool
fetched bool
settingsFetched bool
}
committeeCache := make(map[string]committeeSnapshot)

lookupCommittee := func(committeeUID string) committeeSnapshot {
if snap, ok := committeeCache[committeeUID]; ok {
return snap
}
snap := committeeSnapshot{}
base, _, err := rc.CommitteeReader.GetBase(ctx, committeeUID)
if err != nil {
slog.WarnContext(ctx, "reindex-invites: failed to fetch committee base",
"committee_uid", committeeUID, "error", err)
committeeCache[committeeUID] = snap
return snap
}
snap.fetched = true
snap.name = base.Name
settings, _, settingsErr := rc.CommitteeReader.GetSettings(ctx, committeeUID)
if settingsErr != nil {
// Leave OrganizationRequired unchanged rather than clobbering a correctly-stored
// value with one derived from a transient settings failure.
slog.WarnContext(ctx, "reindex-invites: failed to fetch committee settings — OrganizationRequired will not be updated",
"committee_uid", committeeUID, "error", settingsErr)
} else {
snap.settingsFetched = true
businessEmailRequired := settings != nil && settings.BusinessEmailRequired
snap.organizationRequired = base.EnableVoting || businessEmailRequired
}
committeeCache[committeeUID] = snap
return snap
}

for _, invite := range invites {
snap := lookupCommittee(invite.CommitteeUID)

// Only modify invite fields when the committee lookup succeeded, to avoid
// corrupting correctly-set values on invites whose committee is temporarily unreachable.
needsKVUpdate := false
if snap.fetched {
if invite.CommitteeName == "" && snap.name != "" {
invite.CommitteeName = snap.name
needsKVUpdate = true
}
if snap.settingsFetched && invite.OrganizationRequired != snap.organizationRequired {
invite.OrganizationRequired = snap.organizationRequired
needsKVUpdate = true
}
}

if rc.DryRun {
slog.InfoContext(ctx, "dry-run: would reindex invite",
"invite_uid", invite.UID,
"committee_uid", invite.CommitteeUID,
"committee_name", invite.CommitteeName,
"organization_required", invite.OrganizationRequired,
"kv_update_needed", needsKVUpdate,
"status", invite.Status,
)
stats.Updated++
Expand All @@ -89,22 +149,53 @@ func (s *reindexInvitesSubcommand) Run(ctx context.Context, rc commands.RunConte

failed := false

if err := publishIndexerMessage(ctx, rc, invite); err != nil {
slog.WarnContext(ctx, "failed to publish indexer message",
"error", err,
"invite_uid", invite.UID,
"committee_uid", invite.CommitteeUID,
)
failed = true
if needsKVUpdate {
freshInvite, rev, getErr := rc.CommitteeReader.GetInvite(ctx, invite.UID)
if getErr != nil {
slog.WarnContext(ctx, "failed to fetch invite revision for KV update",
"error", getErr,
"invite_uid", invite.UID,
)
failed = true
} else {
if freshInvite.CommitteeName == "" && snap.name != "" {
freshInvite.CommitteeName = snap.name
}
if snap.settingsFetched {
freshInvite.OrganizationRequired = snap.organizationRequired
}
if updateErr := rc.CommitteeInviteWriter.UpdateInvite(ctx, freshInvite, rev); updateErr != nil {
slog.WarnContext(ctx, "failed to update invite in NATS KV",
"error", updateErr,
"invite_uid", invite.UID,
)
failed = true
} else {
invite = freshInvite
}
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

if err := publishAccessControlMessage(ctx, rc, invite); err != nil {
slog.WarnContext(ctx, "failed to publish access control message",
"error", err,
"invite_uid", invite.UID,
"committee_uid", invite.CommitteeUID,
)
failed = true
if !failed {
if err := publishIndexerMessage(ctx, rc, invite); err != nil {
slog.WarnContext(ctx, "failed to publish indexer message",
"error", err,
"invite_uid", invite.UID,
"committee_uid", invite.CommitteeUID,
)
failed = true
}
}

if !failed {
if err := publishAccessControlMessage(ctx, rc, invite); err != nil {
slog.WarnContext(ctx, "failed to publish access control message",
"error", err,
"invite_uid", invite.UID,
"committee_uid", invite.CommitteeUID,
)
failed = true
}
}

if failed {
Expand Down
Loading
Loading