package main
import (
"os"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/joho/godotenv"
)
const (
MESSAGE_COUNT = 10
NUM_WORKERS = 3
)
type SQS interface {
produce()
consume()
}
type Client struct {
svc *sqs.SQS
queueURL string
}
func main() {
svc, queueURL := setup()
sqsClient := Client{
svc: svc,
queueURL: queueURL,
}
for i := 0; i < MESSAGE_COUNT; i++ {
go sqsClient.produce()
}
sqsClient.consume()
}
func setup() (*sqs.SQS, string) {
err := godotenv.Load()
checkError(err)
sess, err := session.NewSession(&aws.Config{
Region: aws.String(os.Getenv("REGION"))},
)
checkError(err)
svc := sqs.New(sess)
queueURL := os.Getenv("QUEUE_URL")
return svc, queueURL
}
main.gopackage main
import (
"log"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sqs"
)
func (client Client) produce() {
sendMessageInput := sqs.SendMessageInput{
MessageBody: aws.String("A message from Alvin Lin."),
QueueUrl: aws.String(client.queueURL),
}
sendMessageOutput, err := client.svc.SendMessage(&sendMessageInput)
checkError(err)
log.Println(sendMessageOutput)
}
producer.gopackage main
import (
"log"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sqs"
)
func (client Client) consume() {
messages := make(chan *sqs.ReceiveMessageOutput)
for workerId := 1; workerId <= NUM_WORKERS; workerId++ {
go worker(client.svc, client.queueURL, workerId, messages)
}
for {
messages <- receive(client.svc, client.queueURL)
}
}
func worker(svc *sqs.SQS, queueURL string, id int, messages <-chan *sqs.ReceiveMessageOutput) {
for message := range messages {
if len(message.Messages) != 0 {
log.Printf("Worker %d process message", id)
time.Sleep(1 * time.Second)
delete(svc, queueURL, message)
} else {
log.Println("Empty message")
}
}
}
func receive(svc *sqs.SQS, queueURL string) *sqs.ReceiveMessageOutput {
waitTimeSeconds := 1
msgResult, err := svc.ReceiveMessage(&sqs.ReceiveMessageInput{
QueueUrl: aws.String(queueURL),
WaitTimeSeconds: aws.Int64(int64(waitTimeSeconds)),
})
checkError(err)
log.Println(msgResult)
return msgResult
}
func delete(svc *sqs.SQS, queueURL string, msgResult *sqs.ReceiveMessageOutput) {
for _, message := range msgResult.Messages {
_, err := svc.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: aws.String(queueURL),
ReceiptHandle: message.ReceiptHandle,
})
checkError(err)
}
consumer.goRelated Articles