utils.go 1.69 KB
Newer Older
Камалов Эрик's avatar
Камалов Эрик committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package amqp

import oAmqp "github.com/streadway/amqp"

func (c *Connection) ReadQueue(queueName string, out *chan oAmqp.Delivery, prefetchCount, prefetchSize int, global bool) (*Queue, error) {
	queue := &Queue{
		PrefetchCount: prefetchCount,
		PrefetchSize:  prefetchSize,
		Global:        global,
		connection:    c,
		outChan:       out,
		action:        actionRead,
		queueName:     queueName,
	}

	c.queues = append(c.queues, queue)

	if err := queue.run(); err != nil {
		return nil, err
	}

	return queue, nil
}

func (c *Connection) WriteQueue() (*Queue, error) {
	queue := &Queue{
		PrefetchCount: 1,
		PrefetchSize:  0,
		Global:        false,
		connection:    c,
		action:        actionWrite,
	}

	c.queues = append(c.queues, queue)

	if err := queue.run(); err != nil {
		return nil, err
	}

	return queue, nil
}

func (c *Connection) ReadPriorityQueue(queueName string, callback func(msg oAmqp.Delivery) error, prefetchCount, prefetchSize int, global bool) (*PriorityQueue, error) {
	queue := &PriorityQueue{
		Queue: &Queue{
			PrefetchCount: prefetchCount,
			PrefetchSize:  prefetchSize,
			Global:        global,
			connection:    c,
			action:        actionRead,
			queueName:     queueName,
		},
		callback: callback,
	}

	c.queues = append(c.queues, queue.Queue)

	if err := queue.run(); err != nil {
		return nil, err
	}

	return queue, nil
}

func (c *Connection) WritePriorityQueue() (*PriorityQueue, error) {
	queue := &PriorityQueue{
		Queue: &Queue{
			PrefetchCount: 1,
			PrefetchSize:  0,
			Global:        false,
			connection:    c,
			action:        actionWrite,
		},
	}

	c.queues = append(c.queues, queue.Queue)

	if err := queue.run(); err != nil {
		return nil, err
	}

	return queue, nil
}