如何利用消息队列处理秒杀或者抢票需求

注:更新至2018年4月21日,已经将源码放到github中,https://github.com/ibyeyoga/GrabTicket

场景

在高并发应用中,一个非常常见的案例就是——“秒杀”或者“抢票”,它们虽然名字不同,但是本质是一样的。可以用相同的解决方法去解决它们。


思路

一般思路是通过将用户的请求放到预先设定好的定长的队列中去,然后等系统一个一个地去处理,当然一开始这个队列的最大值就已经设置好了,所以只要不能进入队列的用户,都可以返回抢票失败。我在公司的时候也是通过这种思路去解决。这样做有好也有不好,是否采用这种做法根据需求而定。


我这里还有一种方法,就是当发放票券的人是生产者,抢票的用户是消费者,那么我一开始就往队列中塞进一定数量的票,而活动开始的时候,用户就开始消费,如果消费成功就将票券入库,如果失败就重新放回队列中去。两种做法各有千秋,还是那句老话,根据需求而定。下面代码主要介绍我这种做法。

关键代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
GrabCore.php
<?php
namespace yoga\grabticket;
class GrabCore
{
protected $port = 61613;
protected $host = 'tcp://localhost';
protected $username = 'admin';
protected $password = 'admin';
public $stomp;
public $queue = '/queue/activity/ticket-1';
private static $_instance = null;
static public function getInstance(){
if(!(self::$_instance instanceof Grabber)){
self::$_instance = new Grabber();
$uri = self::$_instance->host . ':' . self::$_instance->port;
try{
self::$_instance->stomp = new \Stomp($uri, self::$_instance->username, self::$_instance->password);
self::$_instance->stomp->setReadTimeout(1);
}
catch (\StompException $e){
echo 'Error: ' . $e->getMessage();
}
}
return self::$_instance;
}
private function __clone()
{
}
public static function close(){
$stomp = self::getInstance()->stomp;
unset($stomp);
}
function __destruct()
{
self::close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
Grabber.php
<?php
namespace yoga\grabticket;
class Grabber extends GrabCore
{
public static function grab(\Closure $success, \Closure $failure, $_queue = null){
$stomp = self::getInstance()->stomp;
$queue = $_queue ?? self::getInstance()->queue;
$isSubscribe = $stomp->subscribe($queue);
try{
if($isSubscribe){
if($stomp->hasFrame()){
$frame = $stomp->readFrame();
if($stomp->ack($frame)){
//成功处理操作
$flag = $success($frame->body);
//如果抢票失败,再放回队列
if(!$flag){
$tMark = 're';
$stomp->begin('re');
$stomp->send($queue, $frame->body);
if($stomp->commit($tMark)){
echo '有一张票券入库异常,已经插回队列。';
}
else{
$stomp->abort($tMark);
echo '插入队列异常:' . $frame->body;
}
}
}
else{
$failure('没抢到票哦,请稍后再试试~');
}
}
else{
$failure('没有票了~');
}
}
}
catch (\Exception $exception){
$failure($exception->getMessage());
}
finally{
$stomp->unsubscribe($queue);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
TicketProducer.php
<?php
namespace yoga\grabticket;
class TicketProducer extends GrabCore
{
public static function provide($number,$_queue = null){
$tMark = 'batch';
$stomp = self::getInstance()->stomp;
$queue = $_queue ?? self::getInstance()->queue;
$stomp->begin($tMark);
for($i = 1; $i <= $number; $i++){
$stomp->send($queue,self::generateTicket($i), [
'persistent' => 'true'
]);
}
if($stomp->commit($tMark)){
echo "生成 $number 张票券成功!";
}
else{
$stomp->abort($tMark);
}
}
private static function generateTicket($seed){
$groupCode = ['A','B','C'];
return json_encode([
'tno' => $groupCode[array_rand($groupCode,1)] . substr(microtime(), 3,7) . random_int(1, 99) . $seed
]);
}
}

生成票券:TicketProducer::provide(10);
upload successful
抢票:

1
2
3
4
5
Grabber::grab(function($msgBody){
//成功操作,我这里是输出用户抢票情况
},function($error){
echo $error;
});

upload successful

upload successful

------ 本文结束 ------

版权声明

yoGa's Blog by yoga lee is licensed under a Creative Commons BY-NC-ND 4.0 International License.
yoga lee创作并维护的yoGa's Blog采用创作共用保留署名-非商业-禁止演绎4.0国际许可证
本文首发于yoGa's Bloghttp://yoga.ibye.cn ),版权所有,若需转载请注明出处,谢谢支持。