Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 14 additions & 56 deletions internal/service/committee_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/linuxfoundation/lfx-v2-committee-service/internal/domain/port"
errs "github.com/linuxfoundation/lfx-v2-committee-service/pkg/errors"
"github.com/linuxfoundation/lfx-v2-committee-service/pkg/fields"
"github.com/linuxfoundation/lfx-v2-committee-service/pkg/log"
)

// CommitteeReader defines the interface for committee read operations
Expand Down Expand Up @@ -58,49 +59,33 @@ type committeeReaderOrchestrator struct {
// GetBase retrieves committee base information by UID
func (rc *committeeReaderOrchestrator) GetBase(ctx context.Context, uid string) (*model.CommitteeBase, uint64, error) {

slog.DebugContext(ctx, "executing get committee base use case",
"committee_uid", uid,
)
ctx = log.AppendCtx(ctx, slog.String("committee_uid", uid))
slog.DebugContext(ctx, "executing get committee base use case")
Comment on lines +62 to +63

// Get committee base from storage
committeeBase, revision, err := rc.committeeReader.GetBase(ctx, uid)
if err != nil {
slog.ErrorContext(ctx, "failed to get committee base",
"error", err,
"committee_uid", uid,
)
return nil, 0, err
}

slog.DebugContext(ctx, "committee base retrieved successfully",
"committee_uid", uid,
"revision", revision,
)
slog.DebugContext(ctx, "committee base retrieved successfully", "revision", revision)

return committeeBase, revision, nil
}

// GetSettings retrieves committee settings by UID
func (rc *committeeReaderOrchestrator) GetSettings(ctx context.Context, uid string) (*model.CommitteeSettings, uint64, error) {

slog.DebugContext(ctx, "executing get committee settings use case",
"committee_uid", uid,
)
ctx = log.AppendCtx(ctx, slog.String("committee_uid", uid))
slog.DebugContext(ctx, "executing get committee settings use case")

// Get committee settings from storage
committeeSettings, revision, err := rc.committeeReader.GetSettings(ctx, uid)
if err != nil {
slog.ErrorContext(ctx, "failed to get committee settings",
"error", err,
"committee_uid", uid,
)
return nil, 0, err
}

slog.DebugContext(ctx, "committee settings retrieved successfully",
"committee_uid", uid,
"revision", revision,
)
slog.DebugContext(ctx, "committee settings retrieved successfully", "revision", revision)

return committeeSettings, revision, nil
}
Expand Down Expand Up @@ -129,72 +114,45 @@ func (rc *committeeReaderOrchestrator) GetMemberRevision(ctx context.Context, me
// GetMember retrieves a committee member by committee UID and member UID
func (rc *committeeReaderOrchestrator) GetMember(ctx context.Context, committeeUID, memberUID string) (*model.CommitteeMember, uint64, error) {

slog.DebugContext(ctx, "executing get committee member use case",
"committee_uid", committeeUID,
"member_uid", memberUID,
)
ctx = log.AppendCtx(ctx, slog.String("committee_uid", committeeUID))
ctx = log.AppendCtx(ctx, slog.String("member_uid", memberUID))
slog.DebugContext(ctx, "executing get committee member use case")

// First, verify that the committee exists
_, _, err := rc.committeeReader.GetBase(ctx, committeeUID)
if err != nil {
slog.ErrorContext(ctx, "failed to get committee base - committee does not exist",
"error", err,
"committee_uid", committeeUID,
)
return nil, 0, err
}

// Get committee member from storage
committeeMember, revision, err := rc.committeeReader.GetMember(ctx, memberUID)
if err != nil {
slog.ErrorContext(ctx, "failed to get committee member",
"error", err,
"committee_uid", committeeUID,
"member_uid", memberUID,
)
return nil, 0, err
}

// Verify that the member belongs to the requested committee
if committeeMember.CommitteeUID != committeeUID {
slog.ErrorContext(ctx, "committee member does not belong to the requested committee",
"committee_uid", committeeUID,
"member_uid", memberUID,
"member_committee_uid", committeeMember.CommitteeUID,
)
return nil, 0, errs.NewValidation("committee member does not belong to the requested committee")
}

slog.DebugContext(ctx, "committee member retrieved successfully",
"committee_uid", committeeUID,
"member_uid", memberUID,
"revision", revision,
)
slog.DebugContext(ctx, "committee member retrieved successfully", "revision", revision)

return committeeMember, revision, nil
}

// ListMembers retrieves all members for a given committee UID
func (rc *committeeReaderOrchestrator) ListMembers(ctx context.Context, committeeUID string) ([]*model.CommitteeMember, error) {

slog.DebugContext(ctx, "executing list committee members use case",
"committee_uid", committeeUID,
)
ctx = log.AppendCtx(ctx, slog.String("committee_uid", committeeUID))
slog.DebugContext(ctx, "executing list committee members use case")

// Get all committee members from storage
members, err := rc.committeeReader.ListMembers(ctx, committeeUID)
if err != nil {
slog.ErrorContext(ctx, "failed to list committee members",
"error", err,
"committee_uid", committeeUID,
)
return nil, err
}

slog.DebugContext(ctx, "committee members retrieved successfully",
"committee_uid", committeeUID,
"member_count", len(members),
)
slog.DebugContext(ctx, "committee members retrieved successfully", "member_count", len(members))

return members, nil
}
Expand Down
74 changes: 10 additions & 64 deletions internal/service/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/linuxfoundation/lfx-v2-committee-service/pkg/constants"
"github.com/linuxfoundation/lfx-v2-committee-service/pkg/errors"
"github.com/linuxfoundation/lfx-v2-committee-service/pkg/fields"
"github.com/linuxfoundation/lfx-v2-committee-service/pkg/log"
)

// messageHandlerOrchestrator orchestrates the message handling process
Expand Down Expand Up @@ -63,44 +64,29 @@ func (m *messageHandlerOrchestrator) HandleCommitteeGetAttribute(ctx context.Con
// Parse message data to extract committee UID
uid := string(msg.Data())

slog.DebugContext(ctx, "committee get name request",
"committee_uid", uid,
"attribute", attribute,
)
ctx = log.AppendCtx(ctx, slog.String("committee_uid", uid))
ctx = log.AppendCtx(ctx, slog.String("attribute", attribute))
slog.DebugContext(ctx, "committee get name request")
Comment on lines +67 to +69

// Validate that the committee ID is a valid UUID.
_, err := uuid.Parse(uid)
if err != nil {
slog.ErrorContext(ctx, "error parsing committee ID", "error", err)
return nil, err
}

// Use the committee reader to get the committee base information
committee, _, err := m.committeeReader.GetBase(ctx, uid)
if err != nil {
slog.ErrorContext(ctx, "failed to get committee base",
"error", err,
"committee_uid", uid,
)
return nil, err
}

value, ok := fields.LookupByTag(committee, "json", attribute)
if !ok {
slog.ErrorContext(ctx, "attribute not found in committee",
"attribute", attribute,
"committee_uid", uid,
)
return nil, errors.NewNotFound(fmt.Sprintf("attribute %s not found in committee %s", attribute, uid))
}

strValue, ok := value.(string)
if !ok {
slog.ErrorContext(ctx, "attribute value is not a string",
"attribute", attribute,
"committee_uid", uid,
"value_type", fmt.Sprintf("%T", value),
)
return nil, errors.NewValidation(fmt.Sprintf("attribute %s value is not a string", attribute))
}

Expand All @@ -113,51 +99,34 @@ func (m *messageHandlerOrchestrator) HandleCommitteeListMembers(ctx context.Cont
// Parse message data to extract committee UID
uid := string(msg.Data())

slog.DebugContext(ctx, "committee list members request",
"committee_uid", uid,
)
ctx = log.AppendCtx(ctx, slog.String("committee_uid", uid))
slog.DebugContext(ctx, "committee list members request")

// Validate that the committee ID is a valid UUID.
_, err := uuid.Parse(uid)
if err != nil {
slog.ErrorContext(ctx, "error parsing committee ID", "error", err)
return nil, err
}

// Check if the committee exists first
_, _, err = m.committeeReader.GetBase(ctx, uid)
if err != nil {
slog.ErrorContext(ctx, "failed to get committee base",
"error", err,
"committee_uid", uid,
)
return nil, err
}

// Get all members for the committee
members, err := m.committeeReader.ListMembers(ctx, uid)
if err != nil {
slog.ErrorContext(ctx, "failed to list committee members",
"error", err,
"committee_uid", uid,
)
return nil, err
}

// Marshal the members to JSON
membersJSON, err := json.Marshal(members)
if err != nil {
slog.ErrorContext(ctx, "failed to marshal committee members",
"error", err,
"committee_uid", uid,
)
return nil, errors.NewUnexpected("failed to marshal committee members", err)
}

slog.DebugContext(ctx, "committee list members response",
"committee_uid", uid,
"member_count", len(members),
)
slog.DebugContext(ctx, "committee list members response", "member_count", len(members))

return membersJSON, nil
}
Expand All @@ -167,7 +136,6 @@ func (m *messageHandlerOrchestrator) HandleCommitteeListMembers(ctx context.Cont
func (m *messageHandlerOrchestrator) HandleCommitteeMailingListChanged(ctx context.Context, msg port.TransportMessenger) ([]byte, error) {
var event model.CommitteeMailingListChangedEvent
if err := json.Unmarshal(msg.Data(), &event); err != nil {
slog.ErrorContext(ctx, "failed to unmarshal CommitteeMailingListChangedEvent", "error", err)
return nil, err
}

Expand All @@ -183,8 +151,6 @@ func (m *messageHandlerOrchestrator) HandleCommitteeMailingListChanged(ctx conte

committee, changed, err := m.committeeWriter.UpdateHasMailingList(ctx, event.CommitteeUID, event.HasMailingList)
if err != nil {
slog.ErrorContext(ctx, "failed to update has_mailing_list",
"committee_uid", event.CommitteeUID, "error", err)
return nil, err
}
if !changed {
Expand All @@ -202,15 +168,11 @@ func (m *messageHandlerOrchestrator) HandleCommitteeMailingListChanged(ctx conte

indexerMsg, err := buildIndexerMessage(ctx, model.ActionUpdated, committee, fullCommittee.Tags())
if err != nil {
slog.ErrorContext(ctx, "failed to build indexer message",
"committee_uid", event.CommitteeUID, "error", err)
return nil, err
}
indexerMsg.IndexingConfig = buildCommitteeIndexingConfig(fullCommittee)

if err := m.committeePublisher.Indexer(ctx, constants.IndexCommitteeSubject, indexerMsg, false); err != nil {
slog.ErrorContext(ctx, "failed to publish committee indexer update",
"committee_uid", event.CommitteeUID, "error", err)
return nil, err
}

Expand All @@ -229,7 +191,6 @@ func (m *messageHandlerOrchestrator) HandleCommitteeUpdated(ctx context.Context,

var event model.CommitteeEvent
if err := json.Unmarshal(msg.Data(), &event); err != nil {
slog.ErrorContext(ctx, "failed to unmarshal CommitteeEvent", "error", err)
return nil, err
}

Expand Down Expand Up @@ -267,8 +228,6 @@ func (m *messageHandlerOrchestrator) HandleCommitteeUpdated(ctx context.Context,

members, err := m.committeeReader.ListMembers(ctx, data.CommitteeUID)
if err != nil {
slog.ErrorContext(ctx, "failed to list members for sync",
"committee_uid", data.CommitteeUID, "error", err)
return nil, err
}

Expand Down Expand Up @@ -333,7 +292,6 @@ func (m *messageHandlerOrchestrator) HandleCommitteeTotalMembersSync(ctx context

var event model.CommitteeEvent
if err := json.Unmarshal(msg.Data(), &event); err != nil {
slog.ErrorContext(ctx, "failed to unmarshal CommitteeEvent for total_members sync", "error", err)
return err
}

Expand All @@ -357,46 +315,34 @@ func (m *messageHandlerOrchestrator) HandleCommitteeTotalMembersSync(ctx context
committeeUID := member.CommitteeUID

ctx = context.WithValue(ctx, constants.AuthorizationContextID, "Bearer lfx-v2-committee-service")
ctx = log.AppendCtx(ctx, slog.String("committee_uid", committeeUID))

slog.DebugContext(ctx, "starting total_members sync",
"committee_uid", committeeUID,
"subject", subject,
)
slog.DebugContext(ctx, "starting total_members sync", "subject", subject)

Comment on lines 317 to 321
members, err := m.committeeReader.ListMembers(ctx, committeeUID)
if err != nil {
slog.ErrorContext(ctx, "failed to list members for total_members sync",
"committee_uid", committeeUID, "error", err)
return err
}
actualCount := len(members)

committee, revision, err := m.committeeReader.GetBase(ctx, committeeUID)
if err != nil {
slog.ErrorContext(ctx, "failed to get committee base for total_members sync",
"committee_uid", committeeUID, "error", err)
return err
}

if committee.TotalMembers == actualCount {
slog.DebugContext(ctx, "total_members already correct — skipping update",
"committee_uid", committeeUID,
"total_members", actualCount,
)
slog.DebugContext(ctx, "total_members already correct — skipping update", "total_members", actualCount)
return nil
}

slog.DebugContext(ctx, "updating total_members counter",
"committee_uid", committeeUID,
"previous", committee.TotalMembers,
"actual", actualCount,
)

committee.TotalMembers = actualCount

if _, err := m.committeeWriterOrchestrator.Update(ctx, &model.Committee{CommitteeBase: *committee}, revision, false); err != nil {
slog.ErrorContext(ctx, "failed to update committee total_members",
"committee_uid", committeeUID, "error", err)
return err
}

Expand Down
Loading