Browse Source

Project start

master
Birunda 2 years ago
commit
13e004f9c5
  1. 4
      .gitignore
  2. 386
      amqp.go
  3. 9
      go.mod
  4. 13
      go.sum

4
.gitignore

@ -0,0 +1,4 @@
*~
*.swo
*.swp
.env

386
amqp.go

@ -0,0 +1,386 @@
package kunhatanAmqp
import (
"fmt"
"os"
"errors"
"time"
"strconv"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/sony/gobreaker"
"git.83channel.org/birunda/kunhatan-log-client-go"
)
var conn1 *amqp.Connection
var conn2 *amqp.Connection
var conn3 *amqp.Connection
var ch1 *amqp.Channel
var ch2 *amqp.Channel
var ch3 *amqp.Channel
var responseQueue string = "amq.rabbitmq.reply-to"
var responseTimeout int
var CircuitBreakerMaxFailsBeforeClosing uint32 = 5
var ReloadStylesQueue string
var circuitBreakers map[string]*gobreaker.CircuitBreaker
func failOnError(err error, msg string) {
if err != nil {
logger.Fatalf("%s: %s", msg, err)
}
}
func makeCircuitBreaker(name string) {
logger.Tracef("Making new circuit breaker for %s", name)
var circuitBreakerSettings gobreaker.Settings
circuitBreakerSettings.Name = name
circuitBreakerSettings.MaxRequests = 5
circuitBreakerSettings.Interval = time.Duration(10) * time.Second
circuitBreakerSettings.Timeout = time.Duration(20) * time.Second
circuitBreakerSettings.ReadyToTrip = func(counts gobreaker.Counts) bool {
failureRatio := float64(counts.TotalFailures) / float64(counts.Requests)
return counts.Requests >= CircuitBreakerMaxFailsBeforeClosing &&
failureRatio >= 0.6
}
circuitBreakerSettings.OnStateChange = func(name string, from, to gobreaker.State) {
logger.Debugf("Circuit breaker %s state change: %s", name, to.String())
}
circuitBreakers[name] = gobreaker.NewCircuitBreaker(circuitBreakerSettings)
}
func setupConfig() {
logger.SetLoggerName(os.Getenv("WORKER_NAME"))
circuitBreakers = make(map[string]*gobreaker.CircuitBreaker)
{
art, err := strconv.Atoi(os.Getenv("AMQP_RESPONSE_TIMEOUT"))
if err != nil {
responseTimeout = 5
} else {
responseTimeout = art
}
}
}
func connect() {
var err error
logger.Trace("Connecting to message broker...")
// conn1, err = amqp.Dial("amqp://kunhatan:[email protected]:5672/")
conn1, err = amqp.Dial(os.Getenv("AMQP_URI"))
failOnError(err, "Failed to connect to RabbitMQ")
conn2, err = amqp.Dial(os.Getenv("AMQP_URI"))
failOnError(err, "Failed to connect to RabbitMQ")
conn3, err = amqp.Dial(os.Getenv("AMQP_URI"))
failOnError(err, "Failed to connect to RabbitMQ")
logger.Trace("Connection with the message broker was a success.")
}
func disconnect() {
conn1.Close()
conn2.Close()
}
func setupChannels() {
_ch1, err := conn1.Channel()
failOnError(err, "Failed to open a channel")
ch1 = _ch1
ch3, err = conn3.Channel()
}
func closeChannels() {
ch1.Close()
ch3.Close()
}
func setupExchanges() {
}
func setupQueues() {
_, err := ch1.QueueDeclare("amq.rabbitmq.reply-to", false, true, false, false, nil)
if err != nil {
fmt.Printf("%s", err)
}
}
func ConsumeQueue(
queueName string,
handler func(
msg amqp.Delivery,
expectReply bool,
reply func(message []byte),
),
) {
channel, _ := conn1.Channel()
logger.Tracef("Consuming queue %s...", queueName)
msgs, err := channel.Consume(
queueName,
"",
false,
false,
false,
false,
nil,
)
if err != nil {
logger.Errorf("Error on consuming %s queue: %s", queueName, err)
}
for d := range msgs {
logger.Tracef("Got message on %s: %s", d.RoutingKey, string(d.Body))
if len(d.ReplyTo) == 0 {
go handler(d, false, func(message []byte) {})
} else {
go handler(
d,
true,
func(message []byte) {
logger.Tracef("Publishing message: %s", string(message))
PublishMessageToExchange(
"",
d.ReplyTo,
message,
false,
d.CorrelationId,
)
},
)
}
}
logger.Tracef("Consumer for %s terminated.", queueName)
channel.Close()
}
func RPCRequest(
exchange string,
routingKey string,
message []byte,
correlationId string,
) (amqp.Delivery, error) {
channel, err := conn2.Channel()
if err != nil {
logger.Errorf("RPC channel opening error: %s", err)
return amqp.Delivery{
Body: []byte(`{"code": 500, "content":{"errors":["Internal server error"]}}`),
}, err
}
taskType := fmt.Sprintf("%s_%s", exchange, routingKey)
testCircuitBreaker:
circuitBreaker, ok := circuitBreakers[taskType]
if !ok {
makeCircuitBreaker(taskType)
goto testCircuitBreaker
}
body, err := circuitBreaker.Execute(func() (interface{}, error) {
msgs, err := channel.Consume(
responseQueue,
"",
true,
false,
false,
false,
nil,
)
if err != nil {
logger.Errorf("Couldn't consume response queue %s: %s", responseQueue, err)
return nil, err
}
logger.Tracef(
"Publishing message to %s - %s - reply-to: %s: %s ",
routingKey, correlationId, responseQueue, string(message),
)
err = channel.Publish(
exchange,
routingKey,
false,
false,
amqp.Publishing{
ContentType: "application/json",
CorrelationId: correlationId,
Body: message,
ReplyTo: responseQueue,
},
)
if err != nil {
logger.Errorf("Error on message publish: %s", err)
return nil, err
}
timeout := time.Second * time.Duration(responseTimeout)
for {
select {
case <-time.After(timeout):
return nil, errors.New("Timeout")
case d := <-msgs:
if d.CorrelationId == correlationId {
return d, nil
}
}
}
})
channel.Close()
if err != nil {
logger.Errorf("%s", err)
return amqp.Delivery{
Body: []byte(fmt.Sprintf(
`{"code": 500, "content": {"errors": ["%s"]}}`,
err.Error(),
)),
}, err
}
return body.(amqp.Delivery), nil
}
func OnceTaskIsComplete(taskId, taskType string) amqp.Delivery {
testCircuitBreaker:
circuitBreaker, ok := circuitBreakers[taskType]
if !ok {
makeCircuitBreaker(taskType)
goto testCircuitBreaker
}
// logger.Debugf(
// "Using circuit breaker %s for Request %s of %s type",
// circuitBreaker.Name(), taskId, taskType,
// )
logger.Debugf("Failures for %s: %d", taskType, circuitBreaker.Counts().TotalFailures)
body, err := circuitBreaker.Execute(func() (interface{}, error) {
channel, _ := conn1.Channel()
logger.Debugf("Waiting task on %s", responseQueue)
msgs, err := channel.Consume(
responseQueue,
"",
true,
false,
false,
false,
nil,
)
if err != nil {
logger.Errorf("Error on consuming %s queue: %s", responseQueue, err)
}
timeout := time.Second * time.Duration(responseTimeout)
for {
select {
case <-time.After(timeout):
logger.Errorf("Task %s timeout!", taskId)
return nil, errors.New("Timeout")
case d := <-msgs:
logger.Debugf("Received task %s", d.CorrelationId)
if d.CorrelationId == taskId {
// d.Ack(true)
channel.Cancel(d.ConsumerTag, false)
channel.Close()
return d, nil
}
// d.Nack(true, true)
}
}
})
if err != nil {
logger.Error(err)
return amqp.Delivery{
Body: []byte(`{"code": 500}`),
}
}
return body.(amqp.Delivery)
}
func WithChannel(handler func(channel *amqp.Channel)) {
handler(ch1)
}
func StartMessageQueue() {
setupConfig()
logger.Trace("Starting message queue...")
connect()
setupChannels()
setupExchanges()
setupQueues()
}
func StopMessageQueue() {
logger.Trace("Stopping message queue...")
closeChannels()
disconnect()
logger.Trace("Message queue stopped.")
}
func PublishMessageToExchange(
exchange string,
routingKey string,
message []byte,
expectReply bool,
correlationId string,
) error {
replyTo := ""
if expectReply {
replyTo = responseQueue
}
logger.Tracef(
"Publishing message to %s - %s - reply-to: %s: %s ",
routingKey, correlationId, replyTo, string(message),
)
err := ch3.Publish(
exchange,
routingKey,
false,
false,
amqp.Publishing{
ContentType: "application/json",
CorrelationId: correlationId,
Body: message,
ReplyTo: replyTo,
},
)
if err != nil {
logger.Errorf("Error on message publish: %s", err)
return err
}
return nil
}

9
go.mod

@ -0,0 +1,9 @@
module git.83channel.org/birunda/kunhatan-amqp-go
go 1.17
require (
git.83channel.org/birunda/kunhatan-log-client-go v0.0.0-20220104000509-22b523871751
github.com/rabbitmq/amqp091-go v1.2.0
github.com/sony/gobreaker v0.5.0
)

13
go.sum

@ -0,0 +1,13 @@
git.83channel.org/birunda/kunhatan-log-client-go v0.0.0-20220104000509-22b523871751 h1:X7bj0Rnz1SU2A3TNLvcOkPwgf7MAJEPAq+t2dx3K3sM=
git.83channel.org/birunda/kunhatan-log-client-go v0.0.0-20220104000509-22b523871751/go.mod h1:tWxZXsge85+XAPY35uq3EXTLfT338ZNKGeBt+0ZcaAU=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rabbitmq/amqp091-go v1.2.0 h1:1pHBxAsQh54R9eX/xo679fUEAfv3loMqi0pvRFOj2nk=
github.com/rabbitmq/amqp091-go v1.2.0/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM=
github.com/sony/gobreaker v0.5.0 h1:dRCvqm0P490vZPmy7ppEk2qCnCieBooFJ+YoXGYB+yg=
github.com/sony/gobreaker v0.5.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
Loading…
Cancel
Save