RabbitMQ ile Python Uygulamaları – 3

Merhaba

Bir önceki yazıda RabbitMQ’nun kuyruktaki mesajı sadece 1 adet işçiye gönderdiğini görmüştük. Bu yazıda ise bir mesajın birden fazla işçiye aynı anda gönderilmesi anlatılacaktır.

RabbitMQ yazı serisinin ikinci bölümününe https://www.mehmetince.net/rabbitmq-ile-python-uygulamalari-2/ adresinden erişebilirsiniz.

Exchanges

Bir önceki yazıda kuyruğa mesaj gönderen publisher’ların aslında direk kuyruk ile değil, mesajları kuyruğa ekleme işiyle ilgilenen exchange ile konuştuklarından bahsedilmişti.

Exchange’ler tahmin edildiği üzere, mesajların ilgili kuyruklara yerleştirilmesiyle görevlidirler. RabbitMQ yapısında direct, topic, headers ve fanout olmak üzere hazır gelen exchange tipleri bulunmaktadır.

Önceki örneklerde basic_puslish metodu exchange ismini boş olarak göndermekteydi. Bu durumda gönderilen mesajı default tanımlı exchange yapısı karşılamaktadır.

Publisher.py örneği

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print " [x] Sent %r" % (message,)
connection.close()

Publisher.py örneği analiz edildiğinde, logs adından bir exchange tanımlanmıştır ve tipi fanout‘tur. Ardından bu exchange’e mesaj gönderilirken basic_publish metodunun parametresi olan exchange,  logs atanmıştır.

Geçici Kuyruklar

Önceki örneklerde publisher tarafından bir kuyruk tanımlanmak ve tüm worker/consumer’lar aynı isimli kuyruk’u dinlemekteydi. Bu örnekte ise kuyruk’un RabbitMQ tarafından oluşturulması ve mesaj gönderildikten sonra silinmesi talep edilmektedir.

result = channel.queue_declare(exclusive=True)

Kuyruk tanımlama fonksiyonu çağırıldığında RabbitMQ, kuyruğa random bir isim verecektir. Ayrıca kuyruk dinleme işlemi bittiğinde ise bu kuyruğun silinmesini exclusive=True sağlamaktayız.

Consumer.py kodları

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs',
                   queue=queue_name)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r" % (body,)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

Kuyruktan mesajı okuyacak olan consumer.py kodları incelendiğinde şimdiye kadarki örneklere nazaran biraz daha farklı bir yapı bulunmaktadır. En önemli değişiklik queue_bind metodudur. Bu metodun çağırılması ile exchange, mesajları belirtilen queue’a aktarmaya başlayacaktır.

DEMO

Soldaki ekranda 2 adet consumer dinleme modundadır. Sağ taraftaki terminalden ise kuyruğa mesaj gönderilmektedir. Bu mesajı kuyruğu dinlemekte olan 2 consumer’a da gönderilmiştir.

rabbitmq log

 

Referans : Bu yazı serisinin hazırlanmasında kullanılan en önemli kaynak rabbitmq dökümantasyonudur. Bu dökümantasyona aşağıdaki linkten ulaşabilirsiniz.

http://www.rabbitmq.com/getstarted.html