Browse Source

Change AMQP timeout code.

master
Birunda 2 years ago
parent
commit
c49a60cddb
  1. 71
      src/kunhatan_amqp_clj/core.clj

71
src/kunhatan_amqp_clj/core.clj

@ -37,8 +37,8 @@
(def amqp-timeout (atom (* 3 1000)))
(def amqp-timeout-response
{:meta {}
:message {"code" 500, "content" {"errors" ["Timeout"]}}
:code 500
:message {"code" 503, "content" {"errors" ["Timeout"]}}
:code 503
:content {"errors" ["Timeout"]}})
(defn set-worker-name [val]
@ -213,35 +213,38 @@
[queue-name callback]
(log/trace (format "Consuming queue %s" queue-name))
(let [channel_ (lch/open @connection1)]
(lc/subscribe
channel_
queue-name
(fn [channel meta ^bytes payload]
(try
(let [ack #(lb/ack channel (:delivery-tag meta) true)
nack #(lb/nack channel (:delivery-tag meta) true true)
message (json/parse-string (String. payload "UTF-8"))
task-id (get message "request_id")
process-id (get message "process_id")
session-id (get message "session_id")
reply (if (not (empty? (:reply-to meta)))
#(publish-message-to-exchange
""
(:reply-to meta)
%
(:correlation-id meta))
nil)]
(log/trace (format "Got message from %s: %s" queue-name (json/encode message)))
(apply callback [{:message message
:task-id task-id
:process-id process-id
:session-id session-id
:ack ack
:nack nack
:reply reply
:meta meta}]))
(catch Exception e
(pprint e)
(log/error (ex-info e))))) )))
(let
[channel_ (lch/open @connection1)
consumer-tag
(lc/subscribe
channel_
queue-name
(fn [channel meta ^bytes payload]
(try
(let [ack #(lb/ack channel (:delivery-tag meta) true)
nack #(lb/nack channel (:delivery-tag meta) true true)
message (json/parse-string (String. payload "UTF-8"))
task-id (get message "request_id")
process-id (get message "process_id")
session-id (get message "session_id")
reply (if (not (empty? (:reply-to meta)))
#(publish-message-to-exchange
""
(:reply-to meta)
%
(:correlation-id meta))
nil)]
(log/trace (format "Got message from %s: %s" queue-name (json/encode message)))
(apply callback [{:message message
:task-id task-id
:process-id process-id
:session-id session-id
:ack ack
:nack nack
:reply reply
:meta meta}]))
(catch Exception e
(pprint e)
(log/error (ex-info e))))))]
#(lb/cancel channel_ consumer-tag)))
Loading…
Cancel
Save