From b97618ff4dde112bf12c42fedc5e681cf42b964b Mon Sep 17 00:00:00 2001
From: Kamalov Erik <kamalov@deck.lc>
Date: Fri, 1 Oct 2021 10:42:38 +0300
Subject: [PATCH] s

---
 channel.go | 33 ++++++++++++++++++---------------
 1 file changed, 18 insertions(+), 15 deletions(-)

diff --git a/channel.go b/channel.go
index 5b6701f..cc8aaee 100644
--- a/channel.go
+++ b/channel.go
@@ -29,7 +29,7 @@ func (channel *Channel) restore() (err error) {
 	оставьте backoff параметр равным nil, чтобы не включать повторную обработку
 */
 func (channel *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table, backoff *Backoff,
-	callback func(msg amqp.Delivery) (callbackError error),
+	callback func(msg amqp.Delivery, isLastAttempt bool) (callbackError error),
 	resolve func(msg amqp.Delivery),
 	reject func(msg amqp.Delivery, callbackError error, backoffError error)) (err error) {
 	var isBackoffEnabled = true
@@ -81,23 +81,24 @@ func (channel *Channel) Consume(queue, consumer string, autoAck, exclusive, noLo
 			select {
 			case msg := <-messages:
 				{
-					var callbackErr = callback(msg)
+					var attempt = 0
+					if val, ok := msg.Headers["attempt"]; ok {
+						switch val.(type) {
+						case int32:
+							attempt = int(val.(int32))
+						}
+					}
+					var timeout = exponentialBackoff(attempt, backoff.MinTimeout, backoff.MaxTimeout, backoff.UseJitter).Milliseconds()
+					attempt += 1
+
+					var callbackErr = callback(msg, !isBackoffEnabled || (attempt == backoff.MaxAttemptsCount && backoff.MaxAttemptsCount != 0) || (backoff.StopOnMaxTimeoutReached == true && timeout >= backoff.MaxTimeout.Milliseconds()))
 					if callbackErr != nil {
 						var backoffError error
 						if isBackoffEnabled {
-							var attempt = 0
-							if val, ok := msg.Headers["attempt"]; ok {
-								switch val.(type) {
-								case int32:
-									attempt = int(val.(int32))
-								}
-							}
-							var timeout = exponentialBackoff(attempt, backoff.MinTimeout, backoff.MaxTimeout, backoff.UseJitter).Milliseconds()
 							var retryQueue = queue + ".retry"
 							if backoff.UseMultipleQueue == true {
 								retryQueue = queue + "." + strconv.FormatInt(timeout, 10)
 							}
-							attempt += 1
 							var publishing = amqp.Publishing{
 								Body:     msg.Body,
 								Priority: msg.Priority + 1,
@@ -114,10 +115,12 @@ func (channel *Channel) Consume(queue, consumer string, autoAck, exclusive, noLo
 									reject(msg, callbackErr, backoffError)
 								}
 							} else {
-								var errorQueue = queue + ".error"
-								backoffError = channel.Channel.Publish("", errorQueue, false, false, publishing)
-								if backoffError != nil {
-									reject(msg, callbackErr, backoffError)
+								if backoff.MaxAttemptsCount != 0 {
+									var errorQueue = queue + ".error"
+									backoffError = channel.Channel.Publish("", errorQueue, false, false, publishing)
+									if backoffError != nil {
+										reject(msg, callbackErr, backoffError)
+									}
 								}
 								reject(msg, callbackErr, LastAttemptError)
 							}
-- 
GitLab