Haciendo del Desarrollo y la Arquitectura Web, ciencia y pasión.

Arquitectura orientada a eventos

Anteriormente, ya habíamos hablado de patrones orientados a eventos, habíamos puesto un Kafka en funcionamiento, con algunos publicadores y suscriptores. Pues bien, hoy vamos a profundizar en un escenario puramente orientado a eventos. Primero, explicaré cómo se comporta un sistema orientado a eventos, las ventajas que tiene y también comentaré los inconvenientes, que, por supuesto, existen. Después, como siempre, haremos un poco de código, que luego colgaré por ahí.
Esta arquitectura tiene muchos nombres: EDA (por sus siglas en inglés, Event-Driven Architecture), arquitectura asíncrona, que resalta el carácter asíncrono de esta metodología, o arquitectura de mensajería, cuando se utilizan colas de mensajes. Esta arquitectura se basa en un patrón del que ya hablamos en otro artículo: el productor-suscriptor, donde hay al menos dos entidades: una que produce eventos y la otra que los consume y procesa a su propio ritmo.

Aquí encontramos la primera propiedad clave: el bajo acoplamiento, una característica siempre deseable. Esto nos permite separar entornos y tecnologías, es decir, tenemos un bus de mensajes, y ambos, productores y consumidores, pueden correr desde organizaciones en regiones distintas. Por otro lado, imaginemos un sistema que produce y consume eventos, desarrollado en Java. En un momento dado, podemos añadir consumidores en Python que convivan con el sistema original. Esto facilitaría la migración de una tecnología obsoleta a otra más actual sin grandes complicaciones.

Además, me parece muy interesante que cada uno de los sistemas pueda iniciarse según sus propias necesidades. Por ejemplo un comsumidor puede trabajara su propio ritmo, sin importar a que velocidad se publiquen los eventos. Si tenemos un sistema crítico, podemos asignarle una máquina más potente que trabajará según lo requiera. Pero aún mejor es el hecho de que, si un consumidor falla, el resto del sistema seguirá funcionando, y cuando dicho consumidor vuelva a la normalidad, podrá recuperar los eventos pendientes.

El patrón productor-suscriptor es una alternativa al polling, ya que permite realizar actualizaciones en tiempo real sin necesidad de consultas repetitivas. En el polling, un cliente consulta periódicamente: Hay algo? Hay algo? Hay algo?, lo que genera una carga innecesaria de tráfico en los sistemas y las redes.

Gracias a este patrón, logramos una mayor eficiencia, realizando comunicaciones únicamente cuando hay cambios. Además, conseguimos una latencia cercana al tiempo real, algo que el polling no puede ofrecer. No es exactamente en tiempo real, ya que, lógicamente, hay una pequeña latencia desde que se emite el evento hasta que el consumidor lo extrae del bus de eventos. Con polling, podríamos alcanzar una actualización casi en tiempo real, pero siempre con un compromiso entre velocidad y carga del sistema.

Otra ventaja importante es que el sistema es altamente escalable horizontalmente. La asincronía permite que el consumo de eventos se realice en diferido por un número variable de consumidores. Esto significa que podemos añadir tantos consumidores como sea necesario para mantener los tiempos dentro de un rango aceptable.

Podriamos hacer algunos cálculos para medir la eficiencia del sistema. Usando herramientas de investigación operativa, como la teoría de colas, podemos modelar la tasa de llegada de eventos, los tiempos de espera, el número de productores y consumidores, así como los tiempos de procesamiento. De esta manera, podemos determinar cuántos consumidores serían necesarios para que el tiempo de espera en el sistema no supere cierto umbral. Cuando hagamos el caso práctico, quizás podamos predecir su comportamiento. Súper interesante!

Peeero... siempre hay un pero. Todo funciona bien cuando todo va bien. Sin embargo, si alguna pieza del sistema falla, la trazabilidad se complica, y verificar las transacciones completadas se vuelve más difícil. Lo mismo ocurre con el rollback de operaciones.

Imaginemos, por ejemplo, una transacción económica: un cliente realiza una compra y varios consumidores procesan el evento: uno lo registra en el log, otro gestiona la facturación, otro envía la orden al almacén y otro envía la notificación al cliente. Ahora, supongamos que hay un corte de luz y todos los consumidores o el bus de eventos fallan. Nos encontraríamos con un problema: cómo sabemos qué eventos han sido procesados y cuáles no?

Para mitigar estos riesgos, se pueden emplear varias técnicas:

Usar un broker de mensajes persistente: Al reiniciarse, el sistema podría recuperar su estado anterior. Ejemplos de esto son Kafka o RabbitMQ.
Evitar el procesamiento duplicado: Es importante asegurarse de que un evento no se procese dos veces cuando el sistema se reinicie. Para ello, podemos diseñar el sistema de manera que sea tolerante a fallos, identificando cada evento de forma única.
Garantizar la idempotencia: Aplicar la misma acción dos veces debe producir el mismo resultado, evitando efectos no deseados en el sistema.
Monitoreo eficaz: Es clave detectar problemas como la ausencia de ACK en eventos iniciados o un incremento anormal en la latencia de la cola de mensajes.
Redundancia: Replicar consumidores en diferentes ubicaciones físicas puede mejorar la tolerancia a fallos.

Bueno, pues hasta aquí la teoría. Como en otras ocasiones, vamos a poner esto en práctica. He desarrollado un pequeño ejemplo usando redis como bus de mensajes. Perfecto. He puesto un productor que genera dos tipos de eventos los high y los low, bajo keys distintas event_queue_log y event_queue_high. Si el contador es impar es para el low y si es par para el high. Posteriromente tenemos un consumidor que invocaremos por linea de comandos, podemos pasarle un parametro high o low, y por tanto consumiran deacuerdo a éste.

Lo primero vamos a declarar el eventBus. Se encargará de la conexion al servidor Redis, así que antes habrá que ponerlo en marcha, con Docker lo ponemos en funcionamiento realmente rápido: 

docker pull redis
docker run redis

<?php
// src/EventBus.php

class EventBus {
private $redis;
private $defaultQueueKey;
private const PRIORITY_HIGH = 'high';
private const PRIORITY_LOW = 'low';

public function __construct($host = '127.0.0.1', $port = 6379, $defaultQueueKey = 'event_queue') {
$this->redis = new Redis();
$this->redis->connect($host, $port);
$this->defaultQueueKey = $defaultQueueKey;
}

public function publish($event, $data, $queueKey = null, $priority = self::PRIORITY_LOW) {
if ($queueKey === null) {
// Si no se especifica una cola, usar la prioridad para determinar la cola
$queueKey = $this->defaultQueueKey . '_' . $priority;
} else {
$queueKey = $queueKey . '_' . $priority;
}

print_r("queueKey: " . $queueKey . "\n");
$payload = json_encode(['event' => $event, 'data' => $data, 'priority' => $priority]);
$this->redis->rPush($queueKey, $payload); //es FIFO (First In First Out) cola
//Es útil para implementar colas de trabajo (FIFO) sin tener que hacer polling
}

public function subscribe($callback, $queueKey = null, $priority = null) {
if ($queueKey === null && $priority !== null) {
// Si no se especifica cola pero sí prioridad, usar la cola de esa prioridad
$queueKey = $this->defaultQueueKey . '_' . $priority;
} elseif ($queueKey === null) {
$queueKey = $this->defaultQueueKey . '_' . self::PRIORITY_LOW;
}

while (true) {
// BRPOP bloquea hasta que haya un mensaje disponible
$result = $this->redis->brPop([$queueKey], 0);
if ($result) {
$message = json_decode($result[1], true);
$callback($message);
}
}
}

public static function getPriorities() {
return [self::PRIORITY_HIGH, self::PRIORITY_LOW];
}
}
?>

Despues tenemos el productor. Como he comentado produce eventos de ambos tipos


<?php
// producer.php

require 'src/EventBus.php';

echo "Iniciando productor...\n";
$bus = new EventBus();
$counter = 0;

while (true) {
$counter++;
if ($counter % 2 == 0) {
$bus->publish('nuevo_evento', ['contador' => $counter, 'timestamp' => time()], 'event_queue', 'high');
} else {
$bus->publish('nuevo_evento', ['contador' => $counter, 'timestamp' => time()], 'event_queue', 'low');
}
echo "Evento publicado: $counter\n";
sleep(1);
}
?>

Y finalmente el consumidor:


<?php
// consumer.php

require 'src/EventBus.php';

// Get priority from command line argument, default to 'low' if not specified
$priority = isset($argv[1]) ? strtolower($argv[1]) : 'low';

// Validate priority input
if (!in_array($priority, ['low', 'high'])) {
die("Error: Priority must be either 'low' or 'high'\n");
}

echo "Esperando eventos de prioridad '$priority'...\n";
$bus = new EventBus();

$bus->subscribe(
function ($message) use ($priority) {
// Only process messages matching the specified priority
if (isset($message['priority']) && $message['priority'] === $priority) {
echo "Evento recibido: " . json_encode($message) . "\n";
}
},
null,
$priority
);?>


Ahora en dos (o mas) terminales distintas lanzamos un:

php consumer.php high
php consumer.php low
php producer.php

Y ya finalmente os muestro como resultó en mi local, lanzando a 2 productores y 3 consumidores:

Como siempre espero que os haya resultado tan interesante como a mi. Por cierto el codigo lo tenis aqui Hasta la proxima.