python的消息队列用起来还是比较简单的,需要安装python的pika包。
pip install pika
封装一个类
import pika
# 创建
class CreateChannel:
# 连接配置
host = 'localhost'
port = 5672
user = 'guest'
password = 'guest'
channel = None
exchange = None
queue = None
connection = None
# 是否使用延迟队列
is_delay = False
def __init__(self,exchange_name,queue_name,routing_key_name,is_delay=False):
self.exchange_name = exchange_name
self.queue_name = queue_name
self.routing_key_name = routing_key_name
self.is_delay = is_delay
# 建立一个实例
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self.host, port=self.port,credentials=pika.credentials.PlainCredentials(username=self.user, password=self.password))
)
# 声明一个管道,在管道里发消息
self.channel = self.connection.channel()
# 创建队列
self.channel.queue_declare(queue=self.queue_name,
durable=True, # 队列持久化
)
# 创建交换机
if is_delay == False:
self.channel.exchange_declare(exchange=self.exchange_name)
else:
self.channel.exchange_declare(exchange=self.exchange_name, exchange_type="x-delayed-message",
arguments={"x-delayed-type": "direct"}) # 实现延迟队列
# 绑定队列和交换机
self.channel.queue_bind(queue=self.queue_name, exchange=self.exchange_name, routing_key=self.routing_key_name)
# 消费函数,需要在消费者代码里重写
def callback(self):
pass
# 绑定消费消息的执行函数
def consumer(self):
self.channel.basic_consume(
on_message_callback=self.callback, # 如果收到消息,就调用callback函数来处理消息
queue=self.queue_name, # 你要从那个队列里收消息
# auto_ack=True # 消息消费执行完毕后,是否自动回复消息被处理
)
# 开始消费消息
self.channel.start_consuming()
# 投递消息
def publish(self,msg,ttl=3000):
headers = {}
if self.is_delay == True:
headers = {"x-delay": ttl}
properties = pika.BasicProperties(
delivery_mode=2, # 让消息持久化
headers=headers,
)
self.channel.basic_publish(exchange=self.exchange_name,
routing_key=self.routing_key_name,
body=msg, # 消息内容
properties=properties
)
# 队列关闭
def close(self):
self.connection.close()
生产者
import random
from rabbitmq import create_mq
# 生产者
# channel = create.create_channel(exchange_name="sms_exchange",queue_name="sms_queue",routing_key_name='sms_send')
channel = create_mq.CreateChannel(exchange_name="sms_exchange_delay", queue_name="sms_queue_delay", routing_key_name='sms_send_delay', is_delay=True)# 延迟队列
# 发送5次消息
for i in range(5):
# 消息据数
data_package = str({
'code': random.randint(111111, 999999),
'phone': "13%d" % random.randint(111111111, 999999999),
})
# channel.publish(data_package)
channel.publish(msg=data_package,ttl=i*1000)# 延迟队列
# 关闭通道
channel.close()
消费者
import time
from rabbitmq import create_mq
# 消费者
# channel = create.create_channel(exchange_name="sms_exchange",queue_name="sms_queue",routing_key_name='sms_send')
channel = create_mq.CreateChannel(exchange_name="sms_exchange_delay", queue_name="sms_queue_delay", routing_key_name='sms_send_delay', is_delay=True)# 延迟队列
# 实现消费函数
def callback(ch, method, properties, body):
# 把传过来的字符串转成字典
body = eval(body)
print("接收数据:{}".format(body))
# 模拟耗时
time.sleep(3)
# 告诉生成者,消息处理完成
ch.basic_ack(delivery_tag=method.delivery_tag)
# 绑定消费函数
channel.callback = callback
# 开始消费
channel.consumer()