queue.go 1.83 KB
Newer Older
Камалов Эрик's avatar
Камалов Эрик committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package amqp

import (
	"github.com/streadway/amqp"
	"log"
	"time"
)

const (
	actionRead  = "read queue"
	actionWrite = "write queue"
)

type Queue struct {
	PrefetchCount int
	PrefetchSize  int
	Global        bool
	outChan       *chan amqp.Delivery
	action        string
	queueName     string

	channel    *amqp.Channel
	connection *Connection
}

func (c *Queue) openChannel() (*amqp.Channel, error) {
	if c.channel != nil {
		c.channel.Close()
	}

	channel, err := c.connection.amqpConnection.Channel()
	if err != nil {
		return nil, err
	}
	if err := channel.Qos(c.PrefetchCount, c.PrefetchSize, c.Global); err != nil {
		defer channel.Close()
		return nil, err
	}

	return channel, nil
}

func (q *Queue) run() error {
	channel, err := q.openChannel()
	if err != nil {
		return err
	}
	q.channel = channel

	if q.action == actionRead {
		out, err := q.channel.Consume(q.queueName, "", false, false, false, false, nil)
		if err != nil {
			return err
		}

		// Прослойка для отслеживания падения queue
		go func(q *Queue, out <-chan amqp.Delivery) {
			for {
				select {
				case msg, ok := <-out:
					if !ok {
						if q.connection.amqpConnection.IsClosed() {
							return
						}
						go func(q *Queue) {
							for {
								if err := q.run(); err != nil {
									time.Sleep(500 * time.Millisecond)
									log.Printf("rerun queue error: %v\n", err)
									continue
								}
								break
							}
						}(q)
						return
					}
					*q.outChan <- msg
				}
			}
		}(q, out)
	}

	return nil
}

func (q Queue) stop() error {
	if q.channel != nil {
		q.channel.Close()
		q.channel = nil
	}
	return nil
}

func (q *Queue) Send(exchange string, route string, params amqp.Publishing) error {
	var err error
	err = q.channel.Publish(
		exchange,
		route,
		false,
		false,
		params,
	)
	if err != nil {
		return err
	}
	return err
}