Kunhatan Go microsservice AMQP client.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

541 lines
12 KiB

package kunhatanAmqp
import (
"fmt"
"os"
"errors"
"time"
"strconv"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/sony/gobreaker"
"git.83channel.org/birunda/kunhatan-log-client-go"
)
var conn1 *amqp.Connection
var conn2 *amqp.Connection
var conn3 *amqp.Connection
var ch1 *amqp.Channel
var ch2 *amqp.Channel
var ch3 *amqp.Channel
var responseQueue string = "amq.rabbitmq.reply-to"
var responseTimeout int
var CircuitBreakerMaxFailsBeforeClosing uint32 = 5
var ReloadStylesQueue string
var circuitBreakers map[string]*gobreaker.CircuitBreaker
var isConnecting bool = false
var hadConnection bool = false
var cancelNewConnections bool = false
func failOnError(err error, msg string) {
if err != nil {
logger.Fatalf("%s: %s", msg, err)
}
}
func makeCircuitBreaker(name string) {
logger.Infof("Making new circuit breaker for %s", name)
var circuitBreakerSettings gobreaker.Settings
circuitBreakerSettings.Name = name
circuitBreakerSettings.MaxRequests = 5
circuitBreakerSettings.Interval = time.Duration(10) * time.Second
circuitBreakerSettings.Timeout = time.Duration(20) * time.Second
circuitBreakerSettings.ReadyToTrip = func(counts gobreaker.Counts) bool {
failureRatio := float64(counts.TotalFailures) / float64(counts.Requests)
return counts.Requests >= CircuitBreakerMaxFailsBeforeClosing &&
failureRatio >= 0.6
}
circuitBreakerSettings.OnStateChange = func(name string, from, to gobreaker.State) {
logger.Debugf("Circuit breaker %s state change: %s", name, to.String())
}
circuitBreakers[name] = gobreaker.NewCircuitBreaker(circuitBreakerSettings)
}
func setupConfig() {
logger.SetLoggerName(os.Getenv("WORKER_NAME"))
circuitBreakers = make(map[string]*gobreaker.CircuitBreaker)
{
art, err := strconv.Atoi(os.Getenv("AMQP_RESPONSE_TIMEOUT"))
if err != nil {
responseTimeout = 5
} else {
responseTimeout = art
}
}
}
func connect() {
if isConnecting || cancelNewConnections {
return
}
isConnecting = true
var err error
logger.Info("Connecting to message broker...")
if hadConnection {
if conn1 != nil {
conn1.Close()
}
conn1, err = amqp.Dial(os.Getenv("AMQP_URI"))
} else {
conn1, err = amqp.Dial(os.Getenv("AMQP_URI"))
failOnError(err, "Failed to connect to RabbitMQ")
}
if hadConnection {
if conn2 != nil {
conn2.Close()
}
conn2, err = amqp.Dial(os.Getenv("AMQP_URI"))
} else {
conn2, err = amqp.Dial(os.Getenv("AMQP_URI"))
failOnError(err, "Failed to connect to RabbitMQ")
}
if hadConnection {
if conn3 != nil {
conn3.Close()
}
conn3, err = amqp.Dial(os.Getenv("AMQP_URI"))
} else {
conn3, err = amqp.Dial(os.Getenv("AMQP_URI"))
failOnError(err, "Failed to connect to RabbitMQ")
}
if conn1 != nil && !conn1.IsClosed() {
logger.Info("Connection with the message broker was a success.")
} else {
logger.Error("Connection with the message broker failed.")
}
isConnecting = false
hadConnection = true
}
func disconnect() {
cancelNewConnections = true
conn1.Close()
conn2.Close()
}
func setupChannels() {
_ch1, err := conn1.Channel()
failOnError(err, "Failed to open a channel")
ch1 = _ch1
ch3, err = conn3.Channel()
failOnError(err, "Failed to open a channel")
}
func closeChannels() {
ch1.Close()
ch3.Close()
}
func setupExchanges() {
}
func setupQueues() {
_, err := ch1.QueueDeclare("amq.rabbitmq.reply-to", false, true, false, false, nil)
if err != nil {
logger.Errorf("%s", err)
}
}
func ConsumeQueue(
queueName string,
handler func(
msg amqp.Delivery,
expectReply bool,
reply func(message []byte),
),
) {
logger.Infof("Consuming queue %s", queueName)
consumer_start:
if conn1 == nil || conn1.IsClosed() {
if cancelNewConnections {
return
}
logger.Warnf("Error when consuming queue %s, trying to reconnect...", queueName)
time.Sleep(time.Second * 3)
connect()
time.Sleep(time.Second * 3)
goto consumer_start
}
channel, _ := conn1.Channel()
logger.Debugf("Consuming queue %s...", queueName)
msgs, err := channel.Consume(
queueName,
"",
false,
false,
false,
false,
nil,
)
if err != nil {
logger.Errorf("Error when consuming queue %s: %s", queueName, err)
if conn1.IsClosed() {
connect()
}
}
for d := range msgs {
logger.Debugf("Got message on %s: %s", d.RoutingKey, string(d.Body))
if len(d.ReplyTo) == 0 {
go handler(d, false, func(message []byte) {})
} else {
go handler(
d,
true,
func(message []byte) {
logger.Tracef("Publishing message: %s", string(message))
PublishMessageToExchange(
"",
d.ReplyTo,
message,
d.CorrelationId,
)
},
)
}
}
logger.Infof("Consumer for %s terminated.", queueName)
channel.Close()
time.Sleep(500 * time.Millisecond)
goto consumer_start
}
func ConsumeQueueAutoAck(
queueName string,
handler func(
msg amqp.Delivery,
expectReply bool,
reply func(message []byte),
),
) {
logger.Infof("Consuming queue %s", queueName)
consumer_start:
if conn1 == nil || conn1.IsClosed() {
if cancelNewConnections {
return
}
logger.Warnf("Error when consuming queue %s, trying to reconnect...", queueName)
time.Sleep(time.Second * 3)
connect()
time.Sleep(time.Second * 3)
goto consumer_start
}
channel, _ := conn1.Channel()
logger.Infof("Consuming queue %s...", queueName)
msgs, err := channel.Consume(
queueName,
"",
true,
false,
false,
false,
nil,
)
if err != nil {
logger.Errorf("Error on consuming %s queue: %s", queueName, err)
if conn1.IsClosed() {
connect()
}
}
for d := range msgs {
logger.Debugf("Got message on %s: %s", d.RoutingKey, string(d.Body))
if len(d.ReplyTo) == 0 {
go handler(d, false, func(message []byte) {})
} else {
go handler(
d,
true,
func(message []byte) {
logger.Debugf("Publishing message: %s", string(message))
PublishMessageToExchange(
"",
d.ReplyTo,
message,
d.CorrelationId,
)
},
)
}
}
logger.Infof("Consumer for %s terminated.", queueName)
channel.Close()
time.Sleep(500 * time.Millisecond)
goto consumer_start
}
func RPCRequest(
exchange string,
routingKey string,
message []byte,
correlationId string,
) (amqp.Delivery, error) {
logger.Infof("Sending RPC request for %s on the %s exchange", routingKey, exchange)
rpc_start:
if conn2 == nil || conn2.IsClosed() {
if cancelNewConnections {
return amqp.Delivery{}, nil
}
logger.Warnf("Error on rpc start request for %s on the %s exchange, trying to reconnect...", routingKey, exchange)
time.Sleep(time.Second * 3)
connect()
time.Sleep(time.Second * 3)
goto rpc_start
}
channel, err := conn2.Channel()
if err != nil {
logger.Errorf("RPC channel opening error: %s", err)
return amqp.Delivery{
Body: []byte(`{"code": 500, "content":{"errors":["Internal server error"]}}`),
}, err
}
taskType := fmt.Sprintf("%s_%s", exchange, routingKey)
testCircuitBreaker:
circuitBreaker, ok := circuitBreakers[taskType]
if !ok {
makeCircuitBreaker(taskType)
goto testCircuitBreaker
}
body, err := circuitBreaker.Execute(func() (interface{}, error) {
msgs, err := channel.Consume(
responseQueue,
"",
true,
false,
false,
false,
nil,
)
if err != nil {
logger.Errorf("Couldn't consume response queue %s: %s", responseQueue, err)
return nil, err
}
logger.Debugf(
"Publishing message to %s - %s - reply-to: %s: %s ",
routingKey, correlationId, responseQueue, string(message),
)
err = channel.Publish(
exchange,
routingKey,
false,
false,
amqp.Publishing{
ContentType: "application/json",
CorrelationId: correlationId,
Body: message,
ReplyTo: responseQueue,
},
)
if err != nil {
logger.Errorf("Error on rpc request publish: %s", err)
return nil, err
}
timeout := time.Second * time.Duration(responseTimeout)
for {
select {
case <-time.After(timeout):
return nil, errors.New(fmt.Sprintf("Timeout on RPC %s", routingKey))
case d := <-msgs:
if d.CorrelationId == correlationId {
channel.Cancel(d.ConsumerTag, false)
channel.Close()
return d, nil
}
}
}
})
channel.Close()
if err != nil {
logger.Errorf("%s", err)
return amqp.Delivery{
Body: []byte(fmt.Sprintf(
`{"code": 500, "content": {"errors": ["%s"]}}`,
err.Error(),
)),
}, err
}
return body.(amqp.Delivery), nil
}
func OnceTaskIsComplete(taskId, taskType string) amqp.Delivery {
testCircuitBreaker:
circuitBreaker, ok := circuitBreakers[taskType]
if !ok {
makeCircuitBreaker(taskType)
goto testCircuitBreaker
}
// logger.Debugf(
// "Using circuit breaker %s for Request %s of %s type",
// circuitBreaker.Name(), taskId, taskType,
// )
logger.Debugf("Failures for %s: %d", taskType, circuitBreaker.Counts().TotalFailures)
body, err := circuitBreaker.Execute(func() (interface{}, error) {
channel, _ := conn1.Channel()
logger.Debugf("Waiting task on %s", responseQueue)
msgs, err := channel.Consume(
responseQueue,
"",
true,
false,
false,
false,
nil,
)
if err != nil {
logger.Errorf("Error on consuming %s queue: %s", responseQueue, err)
}
timeout := time.Second * time.Duration(responseTimeout)
for {
select {
case <-time.After(timeout):
logger.Errorf("Task %s timeout!", taskId)
return nil, errors.New("Timeout")
case d := <-msgs:
logger.Debugf("Received task %s", d.CorrelationId)
if d.CorrelationId == taskId {
// d.Ack(true)
channel.Cancel(d.ConsumerTag, false)
channel.Close()
return d, nil
}
// d.Nack(true, true)
}
}
})
if err != nil {
logger.Error(err)
return amqp.Delivery{
Body: []byte(`{"code": 500}`),
}
}
return body.(amqp.Delivery)
}
func WithChannel(handler func(channel *amqp.Channel)) {
handler(ch1)
}
func StartMessageQueue() {
cancelNewConnections = false
setupConfig()
logger.Info("Starting message queue...")
connect()
setupChannels()
setupExchanges()
setupQueues()
logger.Info("Message queue started.")
}
func StopMessageQueue() {
logger.Info("Stopping message queue...")
closeChannels()
disconnect()
logger.Info("Message queue stopped.")
}
func PublishMessageToExchange(
exchange string,
routingKey string,
message []byte,
correlationId string,
) error {
message_publish:
logger.Infof(
"Publishing message to %s - %s: %s: %s ",
routingKey, correlationId, string(message),
)
err := ch3.Publish(
exchange,
routingKey,
false,
false,
amqp.Publishing{
ContentType: "application/json",
CorrelationId: correlationId,
Body: message,
},
)
if err != nil {
logger.Errorf("Message publish error: %s", err)
if conn3 == nil || conn3.IsClosed() || ch3.IsClosed() {
if !ch3.IsClosed() {
ch3.Close()
}
logger.Warnf("Error on message publish, trying to reconnect...")
time.Sleep(time.Second * 3)
connect()
if conn3 != nil && !conn3.IsClosed() {
ch3, _ = conn3.Channel()
}
time.Sleep(time.Second * 3)
goto message_publish
}
}
return nil
}