package cmd import ( "fmt" "os" "github.com/brunseba/cdevents-tools/pkg/events" "github.com/brunseba/cdevents-tools/pkg/output" "github.com/cdevents/sdk-go/pkg/api" "github.com/spf13/cobra" "github.com/spf13/viper" ) var generateCmd = &cobra.Command{ Use: "generate", Short: "Generate CDEvents", Long: `Generate CDEvents for various CI/CD activities. Supported event types: - pipeline: Pipeline run events (queued, started, finished) - task: Task run events (started, finished) - build: Build events (queued, started, finished) - service: Service deployment events (deployed, published, removed, rolledback, upgraded) - test: Test events (testcase-queued, testcase-started, testcase-finished, etc.) Examples: # Generate a pipeline started event cdevents-cli generate pipeline started --id "pipeline-123" --name "my-pipeline" # Generate a build finished event with outcome cdevents-cli generate build finished --id "build-456" --name "my-build" --outcome "success" # Generate a task started event with custom data cdevents-cli generate task started --id "task-101" --name "my-task" --custom-json '{"key":"value"}' # Generate a service deployed event cdevents-cli generate service deployed --id "service-789" --name "my-service" --environment "prod"`, } func init() { rootCmd.AddCommand(generateCmd) } // Common flags for all generate commands func addCommonGenerateFlags(cmd *cobra.Command) { cmd.Flags().StringP("id", "i", "", "Subject ID (required)") cmd.Flags().StringP("name", "n", "", "Subject name (required)") cmd.Flags().StringP("source", "s", "", "Event source (defaults to hostname)") cmd.Flags().StringP("url", "u", "", "Subject URL") cmd.Flags().StringP("outcome", "", "", "Outcome for finished events (success, failure, error, cancel)") cmd.Flags().StringP("errors", "", "", "Error details for failed events") cmd.MarkFlagRequired("id") cmd.MarkFlagRequired("name") // Custom data flag cmd.Flags().String("custom-json", "", "Custom data in JSON format") } // parseCustomData returns custom data parsed from JSON only func parseCustomData(cmd *cobra.Command) (*events.CustomData, error) { customJSON, err := cmd.Flags().GetString("custom-json") if err != nil { return nil, err } // Parse custom data from JSON only if customJSON != "" { return events.ParseCustomDataFromJSON(customJSON) } return nil, nil } func getDefaultSource() string { if source := viper.GetString("source"); source != "" { return source } hostname, err := os.Hostname() if err != nil { return "cdevents-cli" } return fmt.Sprintf("cdevents-cli/%s", hostname) } // outputEvent formats and outputs the event func outputEvent(event interface{}, format string) error { return outputEventWithCustomData(event, nil, format) } // outputEventWithCustomData formats and outputs the event with custom data func outputEventWithCustomData(event interface{}, customData *events.CustomData, format string) error { if cdEvent, ok := event.(api.CDEvent); ok { // Convert events.CustomData to output.CustomData var outputCustomData *output.CustomData if customData != nil { outputCustomData = &output.CustomData{ Data: customData.Data, ContentType: customData.ContentType, } } formatted, err := output.FormatOutputWithCustomData(cdEvent, outputCustomData, format) if err != nil { return fmt.Errorf("failed to format output: %w", err) } fmt.Print(formatted) return nil } return fmt.Errorf("invalid event type") }
package cmd import ( "fmt" "github.com/brunseba/cdevents-tools/pkg/events" "github.com/spf13/cobra" ) var generateBuildCmd = &cobra.Command{ Use: "build", Short: "Generate build events", Args: cobra.MinimumNArgs(1), RunE: func(cmd *cobra.Command, args []string) error { factory := events.NewEventFactory(getDefaultSource()) eventType := args[0] // Parse custom data customData, err := parseCustomData(cmd) if err != nil { return fmt.Errorf("failed to parse custom data: %w", err) } event, err := factory.CreateBuildEvent( eventType, cmd.Flag("id").Value.String(), cmd.Flag("name").Value.String(), cmd.Flag("outcome").Value.String(), cmd.Flag("errors").Value.String(), cmd.Flag("url").Value.String(), customData, ) if err != nil { return fmt.Errorf("failed to create build event: %w", err) } format := cmd.Flag("output").Value.String() return outputEvent(event, format) }, } func init() { addCommonGenerateFlags(generateBuildCmd) generateCmd.AddCommand(generateBuildCmd) }
package cmd import ( "fmt" "github.com/brunseba/cdevents-tools/pkg/events" "github.com/spf13/cobra" ) var generatePipelineCmd = &cobra.Command{ Use: "pipeline", Short: "Generate pipeline events", Args: cobra.MinimumNArgs(1), RunE: func(cmd *cobra.Command, args []string) error { factory := events.NewEventFactory(getDefaultSource()) eventType := args[0] // Parse custom data customData, err := parseCustomData(cmd) if err != nil { return fmt.Errorf("failed to parse custom data: %w", err) } event, err := factory.CreatePipelineRunEvent( eventType, cmd.Flag("id").Value.String(), cmd.Flag("name").Value.String(), cmd.Flag("outcome").Value.String(), cmd.Flag("errors").Value.String(), cmd.Flag("url").Value.String(), customData, ) if err != nil { return fmt.Errorf("failed to create pipeline event: %w", err) } format := cmd.Flag("output").Value.String() return outputEventWithCustomData(event, customData, format) }, } func init() { addCommonGenerateFlags(generatePipelineCmd) generateCmd.AddCommand(generatePipelineCmd) }
package cmd import ( "fmt" "github.com/brunseba/cdevents-tools/pkg/events" "github.com/spf13/cobra" ) var generateServiceCmd = &cobra.Command{ Use: "service", Short: "Generate service events", Args: cobra.MinimumNArgs(1), RunE: func(cmd *cobra.Command, args []string) error { factory := events.NewEventFactory(getDefaultSource()) eventType := args[0] // Parse custom data customData, err := parseCustomData(cmd) if err != nil { return fmt.Errorf("failed to parse custom data: %w", err) } event, err := factory.CreateServiceEvent( eventType, cmd.Flag("id").Value.String(), cmd.Flag("name").Value.String(), cmd.Flag("environment").Value.String(), cmd.Flag("url").Value.String(), customData, ) if err != nil { return fmt.Errorf("failed to create service event: %w", err) } format := cmd.Flag("output").Value.String() return outputEvent(event, format) }, } func init() { addCommonGenerateFlags(generateServiceCmd) generateServiceCmd.Flags().StringP("environment", "e", "", "Environment ID") generateCmd.AddCommand(generateServiceCmd) }
package cmd import ( "fmt" "github.com/brunseba/cdevents-tools/pkg/events" "github.com/spf13/cobra" ) var generateTaskCmd = &cobra.Command{ Use: "task", Short: "Generate task events", Args: cobra.MinimumNArgs(1), RunE: func(cmd *cobra.Command, args []string) error { factory := events.NewEventFactory(getDefaultSource()) eventType := args[0] // Parse custom data customData, err := parseCustomData(cmd) if err != nil { return fmt.Errorf("failed to parse custom data: %w", err) } event, err := factory.CreateTaskRunEvent( eventType, cmd.Flag("id").Value.String(), cmd.Flag("name").Value.String(), cmd.Flag("pipeline").Value.String(), cmd.Flag("outcome").Value.String(), cmd.Flag("errors").Value.String(), cmd.Flag("url").Value.String(), customData, ) if err != nil { return fmt.Errorf("failed to create task event: %w", err) } format := cmd.Flag("output").Value.String() return outputEvent(event, format) }, } func init() { addCommonGenerateFlags(generateTaskCmd) generateTaskCmd.Flags().StringP("pipeline", "p", "", "Pipeline ID") generateCmd.AddCommand(generateTaskCmd) }
package cmd import ( "fmt" "os" "github.com/spf13/cobra" "github.com/spf13/viper" ) var cfgFile string // rootCmd represents the base command when called without any subcommands var rootCmd = &cobra.Command{ Use: "cdevents-cli", Short: "A CLI tool for generating and sending CDEvents", Long: `CDEvents CLI is a command-line tool for generating and sending CDEvents into CI/CD toolchains using CloudEvents as transport. CDEvents is a common specification for Continuous Delivery events, enabling interoperability in the complete software production ecosystem. This tool supports: - Generating various CDEvents (pipeline, task, build, deployment, etc.) - Sending events via HTTP, Kafka, or other transports - Loading event templates from YAML files - Integration with CI/CD systems`, Version: "0.1.0", } // Execute adds all child commands to the root command and sets flags appropriately. func Execute() error { return rootCmd.Execute() } func init() { cobra.OnInitialize(initConfig) // Global flags rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is $HOME/.cdevents-cli.yaml)") rootCmd.PersistentFlags().StringP("output", "o", "json", "output format (json, yaml, cloudevent)") rootCmd.PersistentFlags().BoolP("verbose", "v", false, "verbose output") // Bind flags to viper viper.BindPFlag("output", rootCmd.PersistentFlags().Lookup("output")) viper.BindPFlag("verbose", rootCmd.PersistentFlags().Lookup("verbose")) } // initConfig reads in config file and ENV variables if set. func initConfig() { if cfgFile != "" { // Use config file from the flag. viper.SetConfigFile(cfgFile) } else { // Find home directory. home, err := os.UserHomeDir() if err != nil { fmt.Fprintf(os.Stderr, "Error getting home directory: %v\n", err) os.Exit(1) } // Search config in home directory with name ".cdevents-cli" (without extension). viper.AddConfigPath(home) viper.SetConfigType("yaml") viper.SetConfigName(".cdevents-cli") } viper.AutomaticEnv() // read in environment variables that match // If a config file is found, read it in. if err := viper.ReadInConfig(); err == nil { if viper.GetBool("verbose") { fmt.Fprintf(os.Stderr, "Using config file: %s\n", viper.ConfigFileUsed()) } } }
package cmd import ( "context" "fmt" "time" "github.com/brunseba/cdevents-tools/pkg/transport" "github.com/cdevents/sdk-go/pkg/api" "github.com/spf13/cobra" "github.com/spf13/viper" ) var sendCmd = &cobra.Command{ Use: "send", Short: "Send CDEvents to various targets", Long: `Send CDEvents to various targets such as HTTP endpoints, files, or console. This command combines event generation and transmission in one step. Examples: # Send a pipeline started event via HTTP cdevents-cli send --target http://localhost:8080/events pipeline started --id "pipeline-123" --name "my-pipeline" # Send a build finished event to console cdevents-cli send --target console build finished --id "build-456" --name "my-build" --outcome "success" # Send a service deployed event to a file cdevents-cli send --target file://events.json service deployed --id "service-789" --name "my-service"`, } func init() { rootCmd.AddCommand(sendCmd) // Add transport-specific flags sendCmd.PersistentFlags().StringP("target", "t", "console", "Target to send events to (console, http://..., file://...)") sendCmd.PersistentFlags().IntP("retries", "r", 3, "Number of retry attempts") sendCmd.PersistentFlags().DurationP("timeout", "", 30*time.Second, "Request timeout") sendCmd.PersistentFlags().StringSliceP("headers", "H", []string{}, "HTTP headers (format: key=value)") // Bind flags to viper viper.BindPFlag("target", sendCmd.PersistentFlags().Lookup("target")) viper.BindPFlag("retries", sendCmd.PersistentFlags().Lookup("retries")) viper.BindPFlag("timeout", sendCmd.PersistentFlags().Lookup("timeout")) viper.BindPFlag("headers", sendCmd.PersistentFlags().Lookup("headers")) } // sendEvent sends an event using the specified transport func sendEvent(event interface{}, target string, retries int, timeout time.Duration) error { cdEvent, ok := event.(api.CDEvent) if !ok { return fmt.Errorf("invalid event type") } factory := transport.NewTransportFactory() transport, err := factory.CreateTransport(target) if err != nil { return fmt.Errorf("failed to create transport: %w", err) } ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() if retries > 0 { return SendEventWithRetry(ctx, transport, cdEvent, retries) } return transport.Send(ctx, cdEvent) } // SendEventWithRetry sends an event with retry logic func SendEventWithRetry(ctx context.Context, transport transport.Transport, event api.CDEvent, maxRetries int) error { var lastErr error for i := 0; i <= maxRetries; i++ { if err := transport.Send(ctx, event); err != nil { lastErr = err if i < maxRetries { // Could add exponential backoff here continue } } else { return nil } } return fmt.Errorf("failed to send event after %d retries: %w", maxRetries, lastErr) }
package cmd import ( "fmt" "github.com/brunseba/cdevents-tools/pkg/events" "github.com/spf13/cobra" "github.com/spf13/viper" ) var sendPipelineCmd = &cobra.Command{ Use: "pipeline", Short: "Send pipeline events", Args: cobra.MinimumNArgs(1), RunE: func(cmd *cobra.Command, args []string) error { factory := events.NewEventFactory(getDefaultSource()) eventType := args[0] customData, err := parseCustomData(cmd) if err != nil { return fmt.Errorf("failed to parse custom data: %w", err) } event, err := factory.CreatePipelineRunEvent( eventType, cmd.Flag("id").Value.String(), cmd.Flag("name").Value.String(), cmd.Flag("outcome").Value.String(), cmd.Flag("errors").Value.String(), cmd.Flag("url").Value.String(), customData, ) if err != nil { return fmt.Errorf("failed to create pipeline event: %w", err) } target := viper.GetString("target") retries := viper.GetInt("retries") timeout := viper.GetDuration("timeout") return sendEvent(event, target, retries, timeout) }, } func init() { addCommonGenerateFlags(sendPipelineCmd) sendCmd.AddCommand(sendPipelineCmd) }
package events import ( "encoding/json" "fmt" "time" "github.com/cdevents/sdk-go/pkg/api" cdeventsv04 "github.com/cdevents/sdk-go/pkg/api/v04" "github.com/google/uuid" ) // CustomData represents custom data that can be added to events // This follows the CDEvents spec: https://github.com/cdevents/spec/blob/v0.4.1/spec.md#cdevents-custom-data type CustomData struct { Data interface{} `json:"customData,omitempty"` ContentType string `json:"customDataContentType,omitempty"` } // EventFactory creates CDEvents with common functionality type EventFactory struct { defaultSource string } // NewEventFactory creates a new EventFactory func NewEventFactory(defaultSource string) *EventFactory { return &EventFactory{ defaultSource: defaultSource, } } // CreatePipelineRunEvent creates a pipeline run event func (ef *EventFactory) CreatePipelineRunEvent(eventType, pipelineID, pipelineName, outcome, errors, url string, customData *CustomData) (api.CDEvent, error) { var event api.CDEvent var err error switch eventType { case "queued": event, err = cdeventsv04.NewPipelineRunQueuedEvent() case "started": event, err = cdeventsv04.NewPipelineRunStartedEvent() case "finished": event, err = cdeventsv04.NewPipelineRunFinishedEvent() default: return nil, fmt.Errorf("unsupported pipeline run event type: %s", eventType) } if err != nil { return nil, fmt.Errorf("failed to create pipeline run event: %w", err) } // Set common fields event.SetId(uuid.New().String()) event.SetSource(ef.defaultSource) event.SetTimestamp(time.Now()) event.SetSubjectId(pipelineID) // Set pipeline-specific fields if pipelineRunEvent, ok := event.(interface { SetSubjectPipelineName(string) SetSubjectUrl(string) }); ok { pipelineRunEvent.SetSubjectPipelineName(pipelineName) if url != "" { pipelineRunEvent.SetSubjectUrl(url) } } // Set outcome and errors for finished events if eventType == "finished" { if finishedEvent, ok := event.(interface { SetSubjectOutcome(string) SetSubjectErrors(string) }); ok { if outcome != "" { finishedEvent.SetSubjectOutcome(outcome) } if errors != "" { finishedEvent.SetSubjectErrors(errors) } } } // Apply custom data if provided if customData != nil { ef.applyCustomData(event, customData) } return event, nil } // CreateTaskRunEvent creates a task run event func (ef *EventFactory) CreateTaskRunEvent(eventType, taskID, taskName, pipelineRunID, outcome, errors, url string, customData *CustomData) (api.CDEvent, error) { var event api.CDEvent var err error switch eventType { case "started": event, err = cdeventsv04.NewTaskRunStartedEvent() case "finished": event, err = cdeventsv04.NewTaskRunFinishedEvent() default: return nil, fmt.Errorf("unsupported task run event type: %s", eventType) } if err != nil { return nil, fmt.Errorf("failed to create task run event: %w", err) } // Set common fields event.SetId(uuid.New().String()) event.SetSource(ef.defaultSource) event.SetTimestamp(time.Now()) event.SetSubjectId(taskID) // Set task-specific fields if taskRunEvent, ok := event.(interface { SetSubjectTaskName(string) SetSubjectUrl(string) SetSubjectPipelineRun(map[string]interface{}) }); ok { taskRunEvent.SetSubjectTaskName(taskName) if url != "" { taskRunEvent.SetSubjectUrl(url) } if pipelineRunID != "" { taskRunEvent.SetSubjectPipelineRun(map[string]interface{}{ "id": pipelineRunID, }) } } // Set outcome and errors for finished events if eventType == "finished" { if finishedEvent, ok := event.(interface { SetSubjectOutcome(string) SetSubjectErrors(string) }); ok { if outcome != "" { finishedEvent.SetSubjectOutcome(outcome) } if errors != "" { finishedEvent.SetSubjectErrors(errors) } } } // Apply custom data if provided if customData != nil { ef.applyCustomData(event, customData) } return event, nil } // CreateBuildEvent creates a build event func (ef *EventFactory) CreateBuildEvent(eventType, buildID, buildName, outcome, errors, url string, customData *CustomData) (api.CDEvent, error) { var event api.CDEvent var err error switch eventType { case "queued": event, err = cdeventsv04.NewBuildQueuedEvent() case "started": event, err = cdeventsv04.NewBuildStartedEvent() case "finished": event, err = cdeventsv04.NewBuildFinishedEvent() default: return nil, fmt.Errorf("unsupported build event type: %s", eventType) } if err != nil { return nil, fmt.Errorf("failed to create build event: %w", err) } // Set common fields event.SetId(uuid.New().String()) event.SetSource(ef.defaultSource) event.SetTimestamp(time.Now()) event.SetSubjectId(buildID) // Set build-specific fields if buildEvent, ok := event.(interface { SetSubjectBuildName(string) SetSubjectUrl(string) }); ok { buildEvent.SetSubjectBuildName(buildName) if url != "" { buildEvent.SetSubjectUrl(url) } } // Set outcome and errors for finished events if eventType == "finished" { if finishedEvent, ok := event.(interface { SetSubjectOutcome(string) SetSubjectErrors(string) }); ok { if outcome != "" { finishedEvent.SetSubjectOutcome(outcome) } if errors != "" { finishedEvent.SetSubjectErrors(errors) } } } // Apply custom data if provided if customData != nil { ef.applyCustomData(event, customData) } return event, nil } // CreateServiceEvent creates a service deployment event func (ef *EventFactory) CreateServiceEvent(eventType, serviceID, serviceName, environmentID, url string, customData *CustomData) (api.CDEvent, error) { var event api.CDEvent var err error switch eventType { case "deployed": event, err = cdeventsv04.NewServiceDeployedEvent() case "published": event, err = cdeventsv04.NewServicePublishedEvent() case "removed": event, err = cdeventsv04.NewServiceRemovedEvent() case "rolledback": event, err = cdeventsv04.NewServiceRolledbackEvent() case "upgraded": event, err = cdeventsv04.NewServiceUpgradedEvent() default: return nil, fmt.Errorf("unsupported service event type: %s", eventType) } if err != nil { return nil, fmt.Errorf("failed to create service event: %w", err) } // Set common fields event.SetId(uuid.New().String()) event.SetSource(ef.defaultSource) event.SetTimestamp(time.Now()) event.SetSubjectId(serviceID) // Set service-specific fields if serviceEvent, ok := event.(interface { SetSubjectServiceName(string) SetSubjectUrl(string) SetSubjectEnvironment(map[string]interface{}) }); ok { serviceEvent.SetSubjectServiceName(serviceName) if url != "" { serviceEvent.SetSubjectUrl(url) } if environmentID != "" { serviceEvent.SetSubjectEnvironment(map[string]interface{}{ "id": environmentID, }) } } // Apply custom data if provided if customData != nil { ef.applyCustomData(event, customData) } return event, nil } // CreateTestEvent creates a test event func (ef *EventFactory) CreateTestEvent(eventType, testID, testName, outcome, errors, url string, customData *CustomData) (api.CDEvent, error) { var event api.CDEvent var err error switch eventType { case "testcase-queued": event, err = cdeventsv04.NewTestCaseRunQueuedEvent() case "testcase-started": event, err = cdeventsv04.NewTestCaseRunStartedEvent() case "testcase-finished": event, err = cdeventsv04.NewTestCaseRunFinishedEvent() case "testcase-skipped": event, err = cdeventsv04.NewTestCaseRunSkippedEvent() case "testsuite-queued": event, err = cdeventsv04.NewTestSuiteRunQueuedEvent() case "testsuite-started": event, err = cdeventsv04.NewTestSuiteRunStartedEvent() case "testsuite-finished": event, err = cdeventsv04.NewTestSuiteRunFinishedEvent() case "testoutput-published": event, err = cdeventsv04.NewTestOutputPublishedEvent() default: return nil, fmt.Errorf("unsupported test event type: %s", eventType) } if err != nil { return nil, fmt.Errorf("failed to create test event: %w", err) } // Set common fields event.SetId(uuid.New().String()) event.SetSource(ef.defaultSource) event.SetTimestamp(time.Now()) event.SetSubjectId(testID) // Set test-specific fields based on event type switch eventType { case "testcase-queued", "testcase-started", "testcase-finished", "testcase-skipped": if testEvent, ok := event.(interface { SetSubjectTestCaseName(string) SetSubjectUrl(string) }); ok { testEvent.SetSubjectTestCaseName(testName) if url != "" { testEvent.SetSubjectUrl(url) } } case "testsuite-queued", "testsuite-started", "testsuite-finished": if testEvent, ok := event.(interface { SetSubjectTestSuiteName(string) SetSubjectUrl(string) }); ok { testEvent.SetSubjectTestSuiteName(testName) if url != "" { testEvent.SetSubjectUrl(url) } } } // Set outcome and errors for finished events if eventType == "testcase-finished" || eventType == "testsuite-finished" { if finishedEvent, ok := event.(interface { SetSubjectOutcome(string) SetSubjectErrors(string) }); ok { if outcome != "" { finishedEvent.SetSubjectOutcome(outcome) } if errors != "" { finishedEvent.SetSubjectErrors(errors) } } } // Apply custom data if provided if customData != nil { ef.applyCustomData(event, customData) } return event, nil } // applyCustomData applies custom data to a CDEvent // Note: The current CDEvents SDK v0.4.1 doesn't support direct custom data injection, // so we handle custom data in the output formatters instead. func (ef *EventFactory) applyCustomData(event api.CDEvent, customData *CustomData) { // Custom data is now handled in the output formatters // This function is kept for future SDK versions that may support direct custom data } // ParseCustomDataFromJSON parses custom data from JSON string func ParseCustomDataFromJSON(jsonData string) (*CustomData, error) { if jsonData == "" { return nil, nil } // Parse the JSON data into a generic interface{} var data interface{} if err := json.Unmarshal([]byte(jsonData), &data); err != nil { return nil, fmt.Errorf("failed to parse custom data JSON: %w", err) } return &CustomData{ Data: data, ContentType: "application/json", }, nil }
package output import ( "encoding/json" "fmt" "github.com/cdevents/sdk-go/pkg/api" "gopkg.in/yaml.v3" ) // CustomData represents custom data that can be added to events // This follows the CDEvents spec: https://github.com/cdevents/spec/blob/v0.4.1/spec.md#cdevents-custom-data type CustomData struct { Data interface{} `json:"customData,omitempty"` ContentType string `json:"customDataContentType,omitempty"` } // FormatOutput formats the CDEvent based on the specified format func FormatOutput(event api.CDEvent, format string) (string, error) { return FormatOutputWithCustomData(event, nil, format) } // FormatOutputWithCustomData formats the CDEvent with custom data based on the specified format func FormatOutputWithCustomData(event api.CDEvent, customData *CustomData, format string) (string, error) { switch format { case "json": return formatJSONWithCustomData(event, customData) case "yaml": return formatYAMLWithCustomData(event, customData) case "cloudevent": return formatCloudEventWithCustomData(event, customData) default: return "", fmt.Errorf("unsupported output format: %s", format) } } // formatJSON formats the event as JSON func formatJSON(event api.CDEvent) (string, error) { return formatJSONWithCustomData(event, nil) } // formatJSONWithCustomData formats the event as JSON with custom data func formatJSONWithCustomData(event api.CDEvent, customData *CustomData) (string, error) { // Marshal the event to get its JSON representation eventData, err := json.Marshal(event) if err != nil { return "", fmt.Errorf("failed to marshal event: %w", err) } // Parse the event JSON to a map var eventMap map[string]interface{} if err := json.Unmarshal(eventData, &eventMap); err != nil { return "", fmt.Errorf("failed to unmarshal event: %w", err) } // Add custom data according to CDEvents spec at the root level if customData != nil { if customData.Data != nil { eventMap["customData"] = customData.Data } if customData.ContentType != "" { eventMap["customDataContentType"] = customData.ContentType } } // Marshal back to JSON with custom data data, err := json.MarshalIndent(eventMap, "", " ") if err != nil { return "", fmt.Errorf("failed to marshal event with custom data to JSON: %w", err) } return string(data), nil } // formatYAML formats the event as YAML func formatYAML(event api.CDEvent) (string, error) { return formatYAMLWithCustomData(event, nil) } // formatYAMLWithCustomData formats the event as YAML with custom data func formatYAMLWithCustomData(event api.CDEvent, customData *CustomData) (string, error) { if customData == nil { data, err := yaml.Marshal(event) if err != nil { return "", fmt.Errorf("failed to marshal event to YAML: %w", err) } return string(data), nil } // Similar to JSON, but for YAML eventData, err := json.Marshal(event) if err != nil { return "", fmt.Errorf("failed to marshal event: %w", err) } var eventMap map[string]interface{} if err := json.Unmarshal(eventData, &eventMap); err != nil { return "", fmt.Errorf("failed to unmarshal event: %w", err) } // Add custom data according to CDEvents spec at the root level if customData.Data != nil { eventMap["customData"] = customData.Data } if customData.ContentType != "" { eventMap["customDataContentType"] = customData.ContentType } data, err := yaml.Marshal(eventMap) if err != nil { return "", fmt.Errorf("failed to marshal event with custom data to YAML: %w", err) } return string(data), nil } // formatCloudEvent formats the event as CloudEvent JSON func formatCloudEvent(event api.CDEvent) (string, error) { return formatCloudEventWithCustomData(event, nil) } // formatCloudEventWithCustomData formats the event as CloudEvent JSON with custom data func formatCloudEventWithCustomData(event api.CDEvent, customData *CustomData) (string, error) { ce, err := api.AsCloudEvent(event) if err != nil { return "", fmt.Errorf("failed to convert to CloudEvent: %w", err) } if customData != nil { // Add custom data to the CloudEvent data field ceData, err := json.Marshal(ce) if err != nil { return "", fmt.Errorf("failed to marshal CloudEvent: %w", err) } var ceMap map[string]interface{} if err := json.Unmarshal(ceData, &ceMap); err != nil { return "", fmt.Errorf("failed to unmarshal CloudEvent: %w", err) } // Add custom data to the CloudEvent data according to CDEvents spec if data, ok := ceMap["data"].(map[string]interface{}); ok { if customData.Data != nil { data["customData"] = customData.Data } if customData.ContentType != "" { data["customDataContentType"] = customData.ContentType } } data, err := json.MarshalIndent(ceMap, "", " ") if err != nil { return "", fmt.Errorf("failed to marshal CloudEvent with custom data to JSON: %w", err) } return string(data), nil } data, err := json.MarshalIndent(ce, "", " ") if err != nil { return "", fmt.Errorf("failed to marshal CloudEvent to JSON: %w", err) } return string(data), nil } // FormatMultipleEvents formats multiple events func FormatMultipleEvents(events []api.CDEvent, format string) (string, error) { switch format { case "json": return formatMultipleJSON(events) case "yaml": return formatMultipleYAML(events) case "cloudevent": return formatMultipleCloudEvents(events) default: return "", fmt.Errorf("unsupported output format: %s", format) } } // formatMultipleJSON formats multiple events as JSON array func formatMultipleJSON(events []api.CDEvent) (string, error) { data, err := json.MarshalIndent(events, "", " ") if err != nil { return "", fmt.Errorf("failed to marshal events to JSON: %w", err) } return string(data), nil } // formatMultipleYAML formats multiple events as YAML array func formatMultipleYAML(events []api.CDEvent) (string, error) { data, err := yaml.Marshal(events) if err != nil { return "", fmt.Errorf("failed to marshal events to YAML: %w", err) } return string(data), nil } // formatMultipleCloudEvents formats multiple events as CloudEvents JSON array func formatMultipleCloudEvents(events []api.CDEvent) (string, error) { var cloudEvents []interface{} for _, event := range events { ce, err := api.AsCloudEvent(event) if err != nil { return "", fmt.Errorf("failed to convert event to CloudEvent: %w", err) } cloudEvents = append(cloudEvents, ce) } data, err := json.MarshalIndent(cloudEvents, "", " ") if err != nil { return "", fmt.Errorf("failed to marshal CloudEvents to JSON: %w", err) } return string(data), nil }
package transport import ( "context" "fmt" "strings" "github.com/cdevents/sdk-go/pkg/api" cloudevents "github.com/cloudevents/sdk-go/v2" ) // Transport interface for sending events type Transport interface { Send(ctx context.Context, event api.CDEvent) error } // HTTPTransport sends events via HTTP type HTTPTransport struct { client cloudevents.Client target string } // NewHTTPTransport creates a new HTTP transport func NewHTTPTransport(target string, options ...HTTPOption) (*HTTPTransport, error) { client, err := cloudevents.NewClientHTTP() if err != nil { return nil, fmt.Errorf("failed to create HTTP client: %w", err) } transport := &HTTPTransport{ client: client, target: target, } for _, option := range options { option(transport) } return transport, nil } // HTTPOption configures HTTP transport type HTTPOption func(*HTTPTransport) // WithHTTPHeaders adds custom headers to HTTP requests func WithHTTPHeaders(headers map[string]string) HTTPOption { return func(t *HTTPTransport) { // Logic to configure headers goes here if needed } } // Send sends an event via HTTP func (t *HTTPTransport) Send(ctx context.Context, event api.CDEvent) error { ce, err := api.AsCloudEvent(event) if err != nil { return fmt.Errorf("failed to convert to CloudEvent: %w", err) } ctx = cloudevents.ContextWithTarget(ctx, t.target) ctx = cloudevents.WithEncodingBinary(ctx) result := t.client.Send(ctx, *ce) if cloudevents.IsUndelivered(result) { return fmt.Errorf("failed to send event: %w", result) } return nil } // ConsoleTransport outputs events to console type ConsoleTransport struct { format string } // NewConsoleTransport creates a new console transport func NewConsoleTransport(format string) *ConsoleTransport { return &ConsoleTransport{ format: format, } } // Send outputs an event to console func (t *ConsoleTransport) Send(ctx context.Context, event api.CDEvent) error { // This would use the output package to format the event // For now, just print a simple message fmt.Printf("Event sent to console: %s\n", event.GetId()) return nil } // FileTransport writes events to a file type FileTransport struct { filename string format string } // NewFileTransport creates a new file transport func NewFileTransport(filename, format string) *FileTransport { return &FileTransport{ filename: filename, format: format, } } // Send writes an event to a file func (t *FileTransport) Send(ctx context.Context, event api.CDEvent) error { // Implementation would write to file fmt.Printf("Event sent to file %s: %s\n", t.filename, event.GetId()) return nil } // KafkaTransport sends events to Kafka type KafkaTransport struct { brokers []string topic string client cloudevents.Client } // NewKafkaTransport creates a new Kafka transport func NewKafkaTransport(brokers []string, topic string) (*KafkaTransport, error) { // For now, return an error indicating Kafka support is not implemented return nil, fmt.Errorf("Kafka transport not implemented yet") } // Send sends an event to Kafka func (t *KafkaTransport) Send(ctx context.Context, event api.CDEvent) error { return fmt.Errorf("Kafka transport not implemented yet") } // TransportFactory creates transports based on configuration type TransportFactory struct{} // NewTransportFactory creates a new transport factory func NewTransportFactory() *TransportFactory { return &TransportFactory{} } // CreateTransport creates a transport based on the target URL func (f *TransportFactory) CreateTransport(target string) (Transport, error) { if target == "" || target == "console" { return NewConsoleTransport("json"), nil } if strings.HasPrefix(target, "http://") || strings.HasPrefix(target, "https://") { return NewHTTPTransport(target) } if strings.HasPrefix(target, "file://") { filename := strings.TrimPrefix(target, "file://") return NewFileTransport(filename, "json"), nil } if strings.HasPrefix(target, "kafka://") { return nil, fmt.Errorf("Kafka transport not implemented yet") } return nil, fmt.Errorf("unsupported transport target: %s", target) } // MultiTransport sends events to multiple transports type MultiTransport struct { transports []Transport } // NewMultiTransport creates a new multi transport func NewMultiTransport(transports ...Transport) *MultiTransport { return &MultiTransport{ transports: transports, } } // Send sends an event to all configured transports func (t *MultiTransport) Send(ctx context.Context, event api.CDEvent) error { var errors []string for _, transport := range t.transports { if err := transport.Send(ctx, event); err != nil { errors = append(errors, err.Error()) } } if len(errors) > 0 { return fmt.Errorf("transport errors: %s", strings.Join(errors, "; ")) } return nil }