AWS SQS - Send/Receive/Delete SQS messages

AWS SQS - Send/Receive/Delete SQS messages

Tags
Engineering
SQS
AWS
Created
Dec 28, 2022 04:14 AM
Edited
Jan 12, 2022
Description
An easy SQS client written in Golang to send, receive, and delete messages
package main

import (
	"os"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/sqs"
	"github.com/joho/godotenv"
)

const (
	MESSAGE_COUNT = 10
	NUM_WORKERS   = 3
)

type SQS interface {
	produce()
	consume()
}

type Client struct {
	svc      *sqs.SQS
	queueURL string
}

func main() {
	svc, queueURL := setup()
	sqsClient := Client{
		svc:      svc,
		queueURL: queueURL,
	}

	// Publish messages to SQS asynchronously
	for i := 0; i < MESSAGE_COUNT; i++ {
		go sqsClient.produce()
	}

	// Use a worker pool to consume messages
	sqsClient.consume()
}

func setup() (*sqs.SQS, string) {
	err := godotenv.Load()
	checkError(err)

	sess, err := session.NewSession(&aws.Config{
		Region: aws.String(os.Getenv("REGION"))},
	)
	checkError(err)

	svc := sqs.New(sess)
	queueURL := os.Getenv("QUEUE_URL")
	return svc, queueURL
}
main.go
package main

import (
	"log"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/service/sqs"
)

func (client Client) produce() {
	sendMessageInput := sqs.SendMessageInput{
		MessageBody: aws.String("A message from Alvin Lin."),
		QueueUrl:    aws.String(client.queueURL),
	}

	sendMessageOutput, err := client.svc.SendMessage(&sendMessageInput)
	checkError(err)

	log.Println(sendMessageOutput)
}
producer.go
package main

import (
	"log"
	"time"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/service/sqs"
)

func (client Client) consume() {
	// Use a worker pool to consume messages
	messages := make(chan *sqs.ReceiveMessageOutput)

	// spawning workers
	for workerId := 1; workerId <= NUM_WORKERS; workerId++ {
		go worker(client.svc, client.queueURL, workerId, messages)
	}

	for {
		messages <- receive(client.svc, client.queueURL)
	}
}

func worker(svc *sqs.SQS, queueURL string, id int, messages <-chan *sqs.ReceiveMessageOutput) {
	for message := range messages {
		if len(message.Messages) != 0 {
			log.Printf("Worker %d process message", id)
			time.Sleep(1 * time.Second)
			delete(svc, queueURL, message)
		} else {
			log.Println("Empty message")
		}
	}
}

func receive(svc *sqs.SQS, queueURL string) *sqs.ReceiveMessageOutput {
	waitTimeSeconds := 1
	msgResult, err := svc.ReceiveMessage(&sqs.ReceiveMessageInput{
		QueueUrl:        aws.String(queueURL),
		WaitTimeSeconds: aws.Int64(int64(waitTimeSeconds)), // long polling
	})
	checkError(err)
	log.Println(msgResult)
	return msgResult
}

// Using SQS, we have to delete message from the queue ourselves
func delete(svc *sqs.SQS, queueURL string, msgResult *sqs.ReceiveMessageOutput) {
	for _, message := range msgResult.Messages {
		_, err := svc.DeleteMessage(&sqs.DeleteMessageInput{
			QueueUrl:      aws.String(queueURL),
			ReceiptHandle: message.ReceiptHandle,
		})
		checkError(err)
	}
consumer.go

Related Articles