1 module hunt.framework.queue.RedisQueue;
2 
3 import hunt.framework.task.SerializableTask;
4 
5 import hunt.util.worker.Task;
6 
7 import hunt.collection.List;
8 import hunt.redis.Redis;
9 import hunt.redis.RedisPool;
10 import hunt.Long;
11 import hunt.logging;
12 
13 import core.thread;
14 
15 
16 alias RedisTaskBuilder = SerializableTask delegate();
17 
18 /**
19  * https://blog.rapid7.com/2016/05/04/queuing-tasks-with-redis/
20  * https://danielkokott.wordpress.com/2015/02/14/redis-reliable-queue-pattern/
21  * https://blog.csdn.net/qq_34212276/article/details/78455004
22  */
23 class RedisQueue : TaskQueue {
24     private string _channel;
25     private RedisPool _pool;
26     private RedisTaskBuilder _taskBuilder;
27 
28     enum string BACKUP_CHANNEL_POSTFIX = "_backup"; // backup channel
29 
30     this(RedisPool pool) {
31         _channel = "default-queue";
32         _pool = pool;
33         _taskBuilder = &buildTask;
34     }
35 
36     string channel() {
37         return _channel;
38     }
39 
40     void channel(string value) {
41         _channel = value;
42     }
43 
44     protected SerializableTask buildTask() {
45         return new SerializableTask();
46     }
47 
48     void taskBuilder(RedisTaskBuilder value) {
49         assert(value !is null);
50         _taskBuilder = value;
51     }
52 
53     override void push(Task task) {
54         SerializableTask redisTask = cast(SerializableTask)task;
55 
56         if(redisTask is null) {
57             throw new Exception("Only SerializableTask supported");
58         }
59 
60         ubyte[] message = redisTask.serialize();
61 
62         Redis redis = _pool.borrow();
63         scope(exit) {
64             _pool.returnObject(redis);
65             // redis.close();
66         }
67 
68         Long r = redis.lpush(cast(const(ubyte)[])channel, cast(const(ubyte)[])message);
69         version(HUNT_REDIS_DEBUG) {
70             if(r is null) {
71                 warning("result is null");
72             } else {
73                 infof("count: %d", r.value());
74             }
75         }
76     }
77 
78     override bool isEmpty() {
79         return false;
80     }
81 
82     override Task pop() {
83 
84         Redis redis = _pool.borrow() ;
85         scope(exit) {
86             _pool.returnObject(redis);
87             // redis.close();
88         }
89         
90         version(HUNT_DEBUG) infof("waiting .....");
91         string backupChannel = channel ~ BACKUP_CHANNEL_POSTFIX;
92         const(ubyte)[] resultMessage = redis.brpoplpush(cast(const(ubyte)[])channel, 
93             cast(const(ubyte)[])backupChannel, 0); 
94 
95         version(HUNT_DEBUG) {
96             tracef("channel: %s Message: %s", channel, cast(string)resultMessage);
97             // tracef("%(%02X %)", r);
98         }
99 
100         SerializableTask task;
101         try {
102             task = _taskBuilder();
103 
104             Long v = redis.lrem(cast(const(ubyte)[])backupChannel, 1, resultMessage);
105             version(HUNT_REDIS_DEBUG) {
106                 if(resultMessage is null) {
107                     warning("result is null");
108                 } else {
109                     infof("count: %d", v.value());
110                 }
111             }
112 
113         } catch (Exception ex) {
114             warning(ex.msg);
115             version(HUNT_DEBUG) warning(ex);
116             task = new SerializableTask();
117         }
118 
119         task.deserialize(resultMessage);
120         return task;
121     }
122 }
123