成功最有效的方法就是向有经验的人学习!

python中使用rabbitmq消息队列、延迟队列

python的消息队列用起来还是比较简单的,需要安装python的pika包。

pip install pika

封装一个类

import pika

# 创建
class CreateChannel:

    # 连接配置
    host = 'localhost'
    port = 5672
    user = 'guest'
    password = 'guest'

    channel = None
    exchange = None
    queue = None
    connection = None

    # 是否使用延迟队列
    is_delay = False

    def __init__(self,exchange_name,queue_name,routing_key_name,is_delay=False):
        self.exchange_name = exchange_name
        self.queue_name = queue_name
        self.routing_key_name = routing_key_name
        self.is_delay = is_delay

        # 建立一个实例
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=self.host, port=self.port,credentials=pika.credentials.PlainCredentials(username=self.user, password=self.password))
        )
        # 声明一个管道,在管道里发消息
        self.channel = self.connection.channel()
        # 创建队列
        self.channel.queue_declare(queue=self.queue_name,
                              durable=True,  # 队列持久化
                              )
        # 创建交换机
        if is_delay == False:
            self.channel.exchange_declare(exchange=self.exchange_name)
        else:
            self.channel.exchange_declare(exchange=self.exchange_name, exchange_type="x-delayed-message",
                                 arguments={"x-delayed-type": "direct"})  # 实现延迟队列
        # 绑定队列和交换机
        self.channel.queue_bind(queue=self.queue_name, exchange=self.exchange_name, routing_key=self.routing_key_name)

    # 消费函数,需要在消费者代码里重写
    def callback(self):
        pass

    # 绑定消费消息的执行函数
    def consumer(self):
        self.channel.basic_consume(
            on_message_callback=self.callback,  # 如果收到消息,就调用callback函数来处理消息
            queue=self.queue_name,  # 你要从那个队列里收消息
            # auto_ack=True # 消息消费执行完毕后,是否自动回复消息被处理
        )
        # 开始消费消息
        self.channel.start_consuming()

    # 投递消息
    def publish(self,msg,ttl=3000):
        headers = {}
        if self.is_delay == True:
            headers = {"x-delay": ttl}
        properties = pika.BasicProperties(
              delivery_mode=2,  # 让消息持久化
              headers=headers,
        )

        self.channel.basic_publish(exchange=self.exchange_name,
                              routing_key=self.routing_key_name,
                              body=msg,  # 消息内容
                              properties=properties
       )

    # 队列关闭
    def close(self):
        self.connection.close()

生产者

import random
from rabbitmq import create_mq
# 生产者

# channel = create.create_channel(exchange_name="sms_exchange",queue_name="sms_queue",routing_key_name='sms_send')
channel = create_mq.CreateChannel(exchange_name="sms_exchange_delay", queue_name="sms_queue_delay", routing_key_name='sms_send_delay', is_delay=True)# 延迟队列

# 发送5次消息
for i in range(5):
    # 消息据数
    data_package = str({
        'code': random.randint(111111, 999999),
        'phone': "13%d" % random.randint(111111111, 999999999),
    })
    # channel.publish(data_package)
    channel.publish(msg=data_package,ttl=i*1000)# 延迟队列
# 关闭通道
channel.close()

消费者

import time
from rabbitmq import create_mq
# 消费者

# channel = create.create_channel(exchange_name="sms_exchange",queue_name="sms_queue",routing_key_name='sms_send')
channel = create_mq.CreateChannel(exchange_name="sms_exchange_delay", queue_name="sms_queue_delay", routing_key_name='sms_send_delay', is_delay=True)# 延迟队列

# 实现消费函数
def callback(ch, method, properties, body):
    # 把传过来的字符串转成字典
    body = eval(body)
    print("接收数据:{}".format(body))
    # 模拟耗时
    time.sleep(3)

    # 告诉生成者,消息处理完成
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 绑定消费函数
channel.callback = callback
# 开始消费
channel.consumer()
赞(0) 打赏
未经允许不得转载:陈桂林博客 » python中使用rabbitmq消息队列、延迟队列
分享到

大佬们的评论 抢沙发

全新“一站式”建站,高质量、高售后的一条龙服务

微信 抖音 支付宝 百度 头条 快手全平台打通信息流

橙子建站.极速智能建站8折购买虚拟主机

觉得文章有用就打赏一下文章作者

非常感谢你的打赏,我们将继续给力更多优质内容,让我们一起创建更加美好的网络世界!

支付宝扫一扫打赏

微信扫一扫打赏

登录

找回密码

注册