Skip to content
Merged
6 changes: 6 additions & 0 deletions cmd/committee-api/design/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,12 @@ var CommitteeInviteWithReadonlyAttributes = dsl.Type("committee-invite-with-read
dsl.Example("7cad5a8d-19d0-41a4-81a6-043453daf9ee")
dsl.Format(dsl.FormatUUID)
})
dsl.Attribute("committee_name", dsl.String, "Name of the committee at the time the invite was created", func() {
dsl.Example("Technical Steering Committee")
})
dsl.Attribute("organization_required", dsl.Boolean, "Whether the invitee must supply an organization when accepting. True when the committee has voting enabled or requires a business email.", func() {
dsl.Example(false)
})
dsl.Attribute("invitee_email", dsl.String, "Email of the invited person", func() {
dsl.Format(dsl.FormatEmail)
dsl.Example("invitee@example.com")
Expand Down
43 changes: 37 additions & 6 deletions cmd/committee-api/service/committee_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,8 @@ func (s *committeeServicesrvc) GetInvite(ctx context.Context, p *committeeservic
return nil, wrapError(ctx, errors.NewNotFound("invite not found in this committee"))
}

s.enrichInviteFromCommittee(ctx, invite, p.UID)

return s.convertInviteDomainToResponse(invite), nil
Comment thread
andrest50 marked this conversation as resolved.
}

Expand All @@ -657,6 +659,10 @@ func (s *committeeServicesrvc) CreateInvite(ctx context.Context, p *committeeser
return nil, wrapError(ctx, err)
}

// Best-effort: settings drive organization_required; missing settings means false.
committeeSettings, _, _ := s.storage.GetSettings(ctx, p.UID)
orgRequired := committeeBase.EnableVoting || (committeeSettings != nil && committeeSettings.BusinessEmailRequired)
Comment thread
andrest50 marked this conversation as resolved.

var inviteOrgID, inviteOrgName, inviteOrgWebsite *string
if p.Organization != nil {
inviteOrgID = p.Organization.ID
Expand All @@ -666,12 +672,14 @@ func (s *committeeServicesrvc) CreateInvite(ctx context.Context, p *committeeser
inviteOrganization := organizationPtrFromFields(inviteOrgID, inviteOrgName, inviteOrgWebsite)

invite := &model.CommitteeInvite{
UID: uuid.New().String(),
CommitteeUID: p.UID,
InviteeEmail: p.InviteeEmail,
Organization: inviteOrganization,
Status: "pending",
CreatedAt: time.Now().UTC(),
UID: uuid.New().String(),
CommitteeUID: p.UID,
CommitteeName: committeeBase.Name,
OrganizationRequired: orgRequired,
InviteeEmail: p.InviteeEmail,
Organization: inviteOrganization,
Status: "pending",
CreatedAt: time.Now().UTC(),
Comment thread
andrest50 marked this conversation as resolved.
}
if p.Role != nil {
invite.Role = *p.Role
Expand Down Expand Up @@ -707,6 +715,8 @@ func (s *committeeServicesrvc) CreateInvite(ctx context.Context, p *committeeser
return nil, wrapError(ctx, errGet)
}
revokedInvite.Status = "pending"
revokedInvite.CommitteeName = committeeBase.Name
revokedInvite.OrganizationRequired = orgRequired
if p.Role != nil {
revokedInvite.Role = *p.Role
}
Expand Down Expand Up @@ -855,6 +865,7 @@ func (s *committeeServicesrvc) RevokeInvite(ctx context.Context, p *committeeser
}

invite.Status = "revoked"
s.enrichInviteFromCommittee(ctx, invite, p.UID)
if err := s.storage.UpdateInvite(ctx, invite, rev); err != nil {
return wrapError(ctx, err)
}
Expand Down Expand Up @@ -923,6 +934,7 @@ func (s *committeeServicesrvc) AcceptInvite(ctx context.Context, p *committeeser

// Member created successfully — now mark the invite accepted.
invite.Status = "accepted"
s.enrichInviteFromCommittee(ctx, invite, p.UID)
if err := s.storage.UpdateInvite(ctx, invite, rev); err != nil {
return nil, wrapError(ctx, err)
}
Expand Down Expand Up @@ -966,6 +978,7 @@ func (s *committeeServicesrvc) DeclineInvite(ctx context.Context, p *committeese
}

invite.Status = "declined"
s.enrichInviteFromCommittee(ctx, invite, p.UID)
if err := s.storage.UpdateInvite(ctx, invite, rev); err != nil {
return nil, wrapError(ctx, err)
}
Expand Down Expand Up @@ -1311,6 +1324,24 @@ func (s *committeeServicesrvc) resolveCallerEmail(ctx context.Context) (string,
return userEmails.PrimaryEmail, nil
}

// enrichInviteFromCommittee populates invite fields derived from the committee.
// It sets CommitteeName when missing and always refreshes OrganizationRequired from
// the committee's current settings (voting enabled or business email required).
// Best-effort: errors are logged and the invite is left unchanged on failure.
func (s *committeeServicesrvc) enrichInviteFromCommittee(ctx context.Context, invite *model.CommitteeInvite, committeeUID string) {
cb, _, err := s.storage.GetBase(ctx, committeeUID)
if err != nil {
slog.WarnContext(ctx, "enrichInviteFromCommittee: failed to get committee base",
"committee_uid", committeeUID, "error", err)
return
}
if invite.CommitteeName == "" {
invite.CommitteeName = cb.Name
}
settings, _, _ := s.storage.GetSettings(ctx, committeeUID)
invite.OrganizationRequired = cb.EnableVoting || (settings != nil && settings.BusinessEmailRequired)
}

// publishInviteIndexerMessage publishes an indexer message for invite operations.
// Publishing is best-effort: failures are logged but do not fail the request.
// IndexingConfig is required because the indexer is data-agnostic; publishers supply all indexing metadata.
Expand Down
4 changes: 4 additions & 0 deletions cmd/committee-api/service/committee_service_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,10 @@ func (s *committeeServicesrvc) convertInviteDomainToResponse(invite *model.Commi
InviteeEmail: &invite.InviteeEmail,
Status: invite.Status,
}
if invite.CommitteeName != "" {
result.CommitteeName = &invite.CommitteeName
}
result.OrganizationRequired = &invite.OrganizationRequired
if invite.Role != "" {
result.Role = &invite.Role
}
Expand Down
38 changes: 38 additions & 0 deletions cmd/committee-cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,44 @@ NATS_URL=nats://localhost:4222 \
committee-cli sync members-by-committee-index --committee-uid=abc-123
```

#### `sync reindex-invites`

Re-publishes all committee invites from NATS KV to OpenSearch (via the indexer) and OpenFGA (via fga-sync). For each invite, the command fetches the parent committee's base and settings, backfills `committee_name` (if missing) and recomputes `organization_required` (`enable_voting || business_email_required`), persists any changed fields back to NATS KV, then publishes the updated invite to the indexer and access-control subjects.

Committee base and settings are fetched once per unique committee UID and cached for the duration of the run. If the committee fetch fails the invite is published as-is without modifying its stored fields.

**Subcommand flags**

| Flag | Default | Description |
|---|---|---|
| `--committee-uid` | `""` | Limit reindex to invites of a single committee |
| `--sleep` | `0` | Wait between each invite publish (e.g. `200ms`, `1s`) |
| `--dry-run` | `false` | Log what would be published and updated without writing |

**Exit code:** `0` if no invites failed, `1` otherwise.

**Output:** Structured JSON log line on completion with fields `total`, `updated`, `skipped`, `failed`, `duration_ms`, `rate_per_sec`.

**Examples**

Dry-run to preview scope (safe first step):
```sh
NATS_URL=nats://localhost:4222 LOG_LEVEL=info \
committee-cli sync reindex-invites --dry-run
```

Full reindex with a 200ms pause between invites:
```sh
NATS_URL=nats://localhost:4222 \
committee-cli sync reindex-invites --sleep=200ms
```

Reindex a single committee's invites:
```sh
NATS_URL=nats://localhost:4222 \
committee-cli sync reindex-invites --committee-uid=abc-123
```

## Building

### Local binary
Expand Down
3 changes: 3 additions & 0 deletions cmd/committee-cli/commands/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ type RunContext struct {
// (e.g. IndexMemberByCommittee). This is used by data-repair subcommands that bypass the
// business-logic orchestrator and write to the storage layer directly.
CommitteeMemberWriter port.CommitteeMemberWriter
// CommitteeInviteWriter provides direct storage-layer access to invite write operations
// (e.g. backfilling fields during reindex). Bypasses the business-logic orchestrator.
CommitteeInviteWriter port.CommitteeInviteWriter
// Publisher provides direct access to indexer and access-control messaging (e.g. reindex
// subcommands that need to publish without going through the writer orchestrator).
Publisher port.CommitteePublisher
Expand Down
99 changes: 85 additions & 14 deletions cmd/committee-cli/commands/sync/reindex_invites.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ func (s *reindexInvitesSubcommand) Run(ctx context.Context, rc commands.RunConte
if rc.Publisher == nil {
return errs.NewUnexpected("publisher is not wired in RunContext")
}
if rc.CommitteeInviteWriter == nil {
return errs.NewUnexpected("CommitteeInviteWriter is not wired in RunContext")
}

ctx = context.WithValue(ctx, constants.AuthorizationContextID, "Bearer lfx-v2-committee-service")

Expand All @@ -75,12 +78,59 @@ func (s *reindexInvitesSubcommand) Run(ctx context.Context, rc commands.RunConte
stats.Total = len(invites)
stats.DryRun = rc.DryRun

// Cache per-committee derived fields to avoid redundant NATS KV reads during batch reindex.
// fetched=false means GetBase failed; in that case no fields on the invite are modified.
type committeeSnapshot struct {
name string
organizationRequired bool
fetched bool
}
committeeCache := make(map[string]committeeSnapshot)

lookupCommittee := func(committeeUID string) committeeSnapshot {
if snap, ok := committeeCache[committeeUID]; ok {
return snap
}
snap := committeeSnapshot{}
base, _, err := rc.CommitteeReader.GetBase(ctx, committeeUID)
if err != nil {
slog.WarnContext(ctx, "reindex-invites: failed to fetch committee base",
"committee_uid", committeeUID, "error", err)
committeeCache[committeeUID] = snap
return snap
}
snap.fetched = true
snap.name = base.Name
settings, _, _ := rc.CommitteeReader.GetSettings(ctx, committeeUID)
snap.organizationRequired = base.EnableVoting || (settings != nil && settings.BusinessEmailRequired)
committeeCache[committeeUID] = snap
return snap
Comment thread
andrest50 marked this conversation as resolved.
}

for _, invite := range invites {
snap := lookupCommittee(invite.CommitteeUID)

// Only modify invite fields when the committee lookup succeeded, to avoid
// corrupting correctly-set values on invites whose committee is temporarily unreachable.
needsKVUpdate := false
if snap.fetched {
if invite.CommitteeName == "" && snap.name != "" {
invite.CommitteeName = snap.name
needsKVUpdate = true
}
if invite.OrganizationRequired != snap.organizationRequired {
invite.OrganizationRequired = snap.organizationRequired
needsKVUpdate = true
}
Comment thread
andrest50 marked this conversation as resolved.
Outdated
}

if rc.DryRun {
slog.InfoContext(ctx, "dry-run: would reindex invite",
"invite_uid", invite.UID,
"committee_uid", invite.CommitteeUID,
"committee_name", invite.CommitteeName,
"organization_required", invite.OrganizationRequired,
"kv_update_needed", needsKVUpdate,
"status", invite.Status,
)
stats.Updated++
Expand All @@ -89,22 +139,43 @@ func (s *reindexInvitesSubcommand) Run(ctx context.Context, rc commands.RunConte

failed := false

if err := publishIndexerMessage(ctx, rc, invite); err != nil {
slog.WarnContext(ctx, "failed to publish indexer message",
"error", err,
"invite_uid", invite.UID,
"committee_uid", invite.CommitteeUID,
)
failed = true
if needsKVUpdate {
_, rev, getErr := rc.CommitteeReader.GetInvite(ctx, invite.UID)
if getErr != nil {
slog.WarnContext(ctx, "failed to fetch invite revision for KV update",
"error", getErr,
"invite_uid", invite.UID,
)
failed = true
} else if updateErr := rc.CommitteeInviteWriter.UpdateInvite(ctx, invite, rev); updateErr != nil {
slog.WarnContext(ctx, "failed to update invite in NATS KV",
"error", updateErr,
"invite_uid", invite.UID,
)
failed = true
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

if err := publishAccessControlMessage(ctx, rc, invite); err != nil {
slog.WarnContext(ctx, "failed to publish access control message",
"error", err,
"invite_uid", invite.UID,
"committee_uid", invite.CommitteeUID,
)
failed = true
if !failed {
if err := publishIndexerMessage(ctx, rc, invite); err != nil {
slog.WarnContext(ctx, "failed to publish indexer message",
"error", err,
"invite_uid", invite.UID,
"committee_uid", invite.CommitteeUID,
)
failed = true
}
}

if !failed {
if err := publishAccessControlMessage(ctx, rc, invite); err != nil {
slog.WarnContext(ctx, "failed to publish access control message",
"error", err,
"invite_uid", invite.UID,
"committee_uid", invite.CommitteeUID,
)
failed = true
}
}

if failed {
Expand Down
Loading
Loading