From b97618ff4dde112bf12c42fedc5e681cf42b964b Mon Sep 17 00:00:00 2001 From: Kamalov Erik <kamalov@deck.lc> Date: Fri, 1 Oct 2021 10:42:38 +0300 Subject: [PATCH] s --- channel.go | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/channel.go b/channel.go index 5b6701f..cc8aaee 100644 --- a/channel.go +++ b/channel.go @@ -29,7 +29,7 @@ func (channel *Channel) restore() (err error) { оÑтавьте backoff параметр равным nil, чтобы не включать повторную обработку */ func (channel *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table, backoff *Backoff, - callback func(msg amqp.Delivery) (callbackError error), + callback func(msg amqp.Delivery, isLastAttempt bool) (callbackError error), resolve func(msg amqp.Delivery), reject func(msg amqp.Delivery, callbackError error, backoffError error)) (err error) { var isBackoffEnabled = true @@ -81,23 +81,24 @@ func (channel *Channel) Consume(queue, consumer string, autoAck, exclusive, noLo select { case msg := <-messages: { - var callbackErr = callback(msg) + var attempt = 0 + if val, ok := msg.Headers["attempt"]; ok { + switch val.(type) { + case int32: + attempt = int(val.(int32)) + } + } + var timeout = exponentialBackoff(attempt, backoff.MinTimeout, backoff.MaxTimeout, backoff.UseJitter).Milliseconds() + attempt += 1 + + var callbackErr = callback(msg, !isBackoffEnabled || (attempt == backoff.MaxAttemptsCount && backoff.MaxAttemptsCount != 0) || (backoff.StopOnMaxTimeoutReached == true && timeout >= backoff.MaxTimeout.Milliseconds())) if callbackErr != nil { var backoffError error if isBackoffEnabled { - var attempt = 0 - if val, ok := msg.Headers["attempt"]; ok { - switch val.(type) { - case int32: - attempt = int(val.(int32)) - } - } - var timeout = exponentialBackoff(attempt, backoff.MinTimeout, backoff.MaxTimeout, backoff.UseJitter).Milliseconds() var retryQueue = queue + ".retry" if backoff.UseMultipleQueue == true { retryQueue = queue + "." + strconv.FormatInt(timeout, 10) } - attempt += 1 var publishing = amqp.Publishing{ Body: msg.Body, Priority: msg.Priority + 1, @@ -114,10 +115,12 @@ func (channel *Channel) Consume(queue, consumer string, autoAck, exclusive, noLo reject(msg, callbackErr, backoffError) } } else { - var errorQueue = queue + ".error" - backoffError = channel.Channel.Publish("", errorQueue, false, false, publishing) - if backoffError != nil { - reject(msg, callbackErr, backoffError) + if backoff.MaxAttemptsCount != 0 { + var errorQueue = queue + ".error" + backoffError = channel.Channel.Publish("", errorQueue, false, false, publishing) + if backoffError != nil { + reject(msg, callbackErr, backoffError) + } } reject(msg, callbackErr, LastAttemptError) } -- GitLab