RabbitMQ ile Görevleri Zamanlayın

Photo by Hal Gatewood on Unsplash

RabbitMQ, uygulamanıza yapmak istediğiniz işleri sıraya alarak, asenkron bir şekilde çalışmanıza yardımcı olan ve kaynak kodu herkese açık bir mesaj kuyruklama aracıdır.

Nasıl çalıştığını size şöyle anlatabilirim: Sitenize üye olan kullanıcılarınıza “Hoşgeldiniz” diyen bir e-posta gönderdiğinizi düşünelim. Eğer bu işlemi kayıt esnasında çalışan bir iş parçası olarak tasarladıysanız, kullanıcınız uygulamanızın e-postayı göndermek için çalıştığı süre kadar bekleyecek. Fakat işlemi kuyruğa alırsanız kullanıcınız kayıt olup işlemlerine devam ederken, RabbitMQ e-posta gönderme işlemini uygulamanızın kulağına sessizce fısıldayacak.

Bu sayede bazı kazanımlar elde edeceksiniz;

  • Kullanıcınız daha az beklediği için daha kaliteli bir deneyim sağlayacak.
  • Uygulamanız işlemi anlık bir şekilde yapmak için kaynak sarfetmeyecek ve daha çok isteğe cevap verebilecek.
  • E-posta gönderiminden kaynaklanabilecek olası bir hata durumunda uygulamanızın akışı doğrudan etkilenmeyecek.

Peki biz Zingat’ta neden mesajları sırayla değil de zamanlayarak göndermek istedik?

Normalde RabbitMQ’ya teslim ettiğiniz her mesaj uygulamanızdaki yoğunluğa göre sırası gelince işlenir ve bir çıktı üretilir. Fakat bizim mesajlarımızı zamanlamamız gerekti çünkü kullandığımız üçüncü parti servislere gecenin yarısında binlerce istek gönderiyoruz, bu durumda kullandığımız servisin isteklerimizin hepsine yanıt vermesi zorlaşıyor ve zaman zaman da yanıt alamıyoruz. Bu sorunu bir ölçüde aşabilmek için böyle bir yol izlemenin faydalı olabileceğini düşündük. Faydalı da oldu!

Öncesinde RabbitMQ’nun böyle bir özelliğe destek vermediğini düşünüyorduk hatta kendimiz kuyruğa gönderirken zamanlamayı filan düşündük ki bu da az mantıklı sayılmazdı. Biraz daha araştırdığımızda bunun bir eklenti ile yapılabildiğini gördük ve kolları sıvadık.

Bu iş için “RabbitMQ Delayed Message” eklentisini kurmak gerekiyor. http://www.rabbitmq.com/community-plugins.html adresinden eklentiyi indirebilirsiniz.

Download linkine tıklayarak eklenti dosyasını indirebilirsiniz (.ez uzantılı)

Eklenti dosyasını zipten çıkarıp .ez uzantılı dosyayı RabbitMQ plugins klasörüne kopyalayın. Dizin adresini öğrenmek için aşağıdaki komutu kullanabilirsiniz. (Plugin Archives dizinini kopyalamalısınız)

Eklentiyi dizine kopyaladıktan sonra list komutuyla durumlarını gözlemleyebilirsiniz.

Gördüğünüz gibi RabbitMQ eklentiyi tanıdı fakat pasif halde bekliyor.

Şimdi sıra eklentiyi etkinleştirmekte. Bu işlem için aşağıdaki komutu kullanabilirsiniz.

Eklenti artık aktif ve göreve hazır

Artık eklenti çalışır durumda. Şimdi RabbitMQ yönetim arayüzünden x-delayed-message türünde bir exchange oluşturmamız lazım.

Eğer arayüzün varsayılan adresi olan http://localhost:15672 açılmazsa yönetim arayüzü için etkileştirmeniz gereken eklentiler var demektir. Bu işlem için rabbitmq_management eklentisini aktif etmeniz gerekiyor.

Bu eklenti RabbitMQ için bir yönetim arayüzü sağlıyor, süper!

Exchanges sekmesindeki add a new exchange bölümünden yeni bir exchange ekliyoruz tipini x-delayed-exchange seçip argümanlar kısmına x-delayed-type = topic yazıp kaydediyoruz.

Ardından yeni bir kuyruk oluşturuyoruz, mesajlarımızı bu kuyruğa göndereceğiz. Ben amacına uygun olsun diye scheduled-queue dedim, siz farklı isimler kullanabilirsiniz.

Sonra bu kuyruğa bir routing key ve bir exchange bind etmemiz gerekiyor.

Oluşturduğunuz kuyruk ismine tıklayarak detayına ulaşabilirsiniz.

Hepsi bu kadar.

Artık tek yapmamız gereken uygulama içinde mesajlarımızı zamanlanmış kuyruğa gönderirken x-delay adında bir header parametresine milisaniye cinsinden değer eklemek.

Biz PHP tabanlı projemizde RabbitMQ istemcisi olarak php-amqplib/php-amqplib paketini kullanıyoruz, bu paketi kullanarak zamanlanmış bir mesaj göndermek isteseydik şöyle bir kod yazmamız gerekecekti.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<?php
public function scheduledPublish()
    {
        $headers = new AMQPTable();
        $delay = 60000; // 1 minute
        $headers->set('x-delay', $delay, AMQPTable::T_INT_LONG);
        $exchangeName = 'delayed';
        $routingKey = 'delayed-messages';
        $properties = [
            'content_type' => 'text/plain',
            'delivery_mode' => 2
        ];
        $payload = [
            'message' => 'A scheduled RabbitMQ message'
        ];
        $msg = new AMQPMessage(json_encode($payload, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE), $properties);
        $msg->set('application_headers', $headers);
        try {
            $this->channel->basic_publish($msg, $exchangeName, $routingKey, true);
            $this->channel->wait_for_pending_acks();
        } catch (\Exception $e) {
            return false;
        }
        return true;
    }

Okuduğunuz için teşekkür ederim.