1 module hunt.framework.queue.RedisQueue; 2 3 import hunt.framework.task.SerializableTask; 4 5 import hunt.util.worker.Task; 6 7 import hunt.collection.List; 8 import hunt.redis.Redis; 9 import hunt.redis.RedisPool; 10 import hunt.Long; 11 import hunt.logging; 12 13 import core.thread; 14 15 16 alias RedisTaskBuilder = SerializableTask delegate(); 17 18 /** 19 * https://blog.rapid7.com/2016/05/04/queuing-tasks-with-redis/ 20 * https://danielkokott.wordpress.com/2015/02/14/redis-reliable-queue-pattern/ 21 * https://blog.csdn.net/qq_34212276/article/details/78455004 22 */ 23 class RedisQueue : TaskQueue { 24 private string _channel; 25 private RedisPool _pool; 26 private RedisTaskBuilder _taskBuilder; 27 28 enum string BACKUP_CHANNEL_POSTFIX = "_backup"; // backup channel 29 30 this(RedisPool pool) { 31 _channel = "default-queue"; 32 _pool = pool; 33 _taskBuilder = &buildTask; 34 } 35 36 string channel() { 37 return _channel; 38 } 39 40 void channel(string value) { 41 _channel = value; 42 } 43 44 protected SerializableTask buildTask() { 45 return new SerializableTask(); 46 } 47 48 void taskBuilder(RedisTaskBuilder value) { 49 assert(value !is null); 50 _taskBuilder = value; 51 } 52 53 override void push(Task task) { 54 SerializableTask redisTask = cast(SerializableTask)task; 55 56 if(redisTask is null) { 57 throw new Exception("Only SerializableTask supported"); 58 } 59 60 ubyte[] message = redisTask.serialize(); 61 62 Redis redis = _pool.borrow(); 63 scope(exit) { 64 _pool.returnObject(redis); 65 // redis.close(); 66 } 67 68 Long r = redis.lpush(cast(const(ubyte)[])channel, cast(const(ubyte)[])message); 69 version(HUNT_REDIS_DEBUG) { 70 if(r is null) { 71 warning("result is null"); 72 } else { 73 infof("count: %d", r.value()); 74 } 75 } 76 } 77 78 override bool isEmpty() { 79 return false; 80 } 81 82 override Task pop() { 83 84 Redis redis = _pool.borrow() ; 85 scope(exit) { 86 _pool.returnObject(redis); 87 // redis.close(); 88 } 89 90 version(HUNT_DEBUG) infof("waiting ....."); 91 string backupChannel = channel ~ BACKUP_CHANNEL_POSTFIX; 92 const(ubyte)[] resultMessage = redis.brpoplpush(cast(const(ubyte)[])channel, 93 cast(const(ubyte)[])backupChannel, 0); 94 95 version(HUNT_DEBUG) { 96 tracef("channel: %s Message: %s", channel, cast(string)resultMessage); 97 // tracef("%(%02X %)", r); 98 } 99 100 SerializableTask task; 101 try { 102 task = _taskBuilder(); 103 104 Long v = redis.lrem(cast(const(ubyte)[])backupChannel, 1, resultMessage); 105 version(HUNT_REDIS_DEBUG) { 106 if(resultMessage is null) { 107 warning("result is null"); 108 } else { 109 infof("count: %d", v.value()); 110 } 111 } 112 113 } catch (Exception ex) { 114 warning(ex.msg); 115 version(HUNT_DEBUG) warning(ex); 116 task = new SerializableTask(); 117 } 118 119 task.deserialize(resultMessage); 120 return task; 121 } 122 } 123