← Blog

Patterns for Writing Efficient Queue Consumers

If you’ve built a queue consumer recently, there’s a good chance it was AI-generated or at least AI-assisted. And the default shape of that code is almost always a tight poll-process loop with chunked Promise.all concurrency. It works, but it doesn’t scale.

I was doing a short project recently that involved optimizing queue throughput, and it reminded me how easy it is to miss these patterns, especially when the code looks reasonable at first glance. These are some generic patterns that are good for writing efficient queue consumers.

All examples are in TypeScript and generalized. They apply whether you’re consuming from SQS, RabbitMQ, Kafka, or any pull-based queue.

Pattern 1: Decouple Polling from Processing

The default queue consumer looks like this:

async function consumeMessages(queue: MessageQueue) {
  while (true) {
    const messages = await queue.poll(); // network round-trip

    if (!messages.length) {
      await sleep(500);
      continue;
    }

    // Process each message before polling again
    for (const msg of messages) {
      await processMessage(msg);
    }
  }
}

While you’re processing, you’re not polling. While you’re polling, you’re not processing. 10 messages at 100ms each means a full second of no new work being fetched.

Split them into independent loops connected by a bounded buffer:

async function startConsumer(queue: MessageQueue) {
  const buffer: Message[] = [];
  const maxBufferSize = 50;
  let bufferNotEmpty = createSignal();
  let bufferNotFull = createSignal();

  // Poller: fills the buffer independently
  async function pollLoop() {
    while (true) {
      if (buffer.length >= maxBufferSize) {
        await bufferNotFull.wait(); // backpressure
      }

      const messages = await queue.poll();

      if (!messages.length) {
        await sleep(500);
        continue;
      }

      buffer.push(...messages);
      bufferNotEmpty.signal();
    }
  }

  // Processor: drains the buffer independently
  async function processLoop() {
    while (true) {
      if (buffer.length === 0) {
        await bufferNotEmpty.wait();
      }

      const msg = buffer.shift()!;
      if (buffer.length < maxBufferSize) {
        bufferNotFull.signal();
      }

      await processMessage(msg);
    }
  }

  // Run both concurrently
  await Promise.all([pollLoop(), processLoop()]);
}

This is the producer-consumer pattern applied within the consumer itself. The poller pre-fetches while the processor works, and the bounded buffer provides natural backpressure. When it’s full, the poller waits. No dead time, no unbounded growth.

On I/O-bound consumers, this alone can double throughput.

Pattern 2: Semaphore Concurrency Over Batch Chunking

The next thing you’ll reach for is concurrent processing. The intuitive approach is chunking:

const BATCH_SIZE = 5;

async function processLoop(buffer: Message[]) {
  while (true) {
    const batch = buffer.splice(0, BATCH_SIZE);

    if (!batch.length) {
      await sleep(100);
      continue;
    }

    // Process the entire batch concurrently, wait for ALL to finish
    await Promise.all(batch.map(msg => processMessage(msg)));
  }
}

This has a flaw called the convoy effect. Promise.all waits for the slowest message in the batch. If four messages finish in 50ms but one takes 2 seconds, those four slots sit idle until the slow one completes.

Batch 1: [msg1: 50ms, msg2: 50ms, msg3: 50ms, msg4: 2000ms, msg5: 50ms]
          ├── msg1 done at 50ms   (idle for 1950ms)
          ├── msg2 done at 50ms   (idle for 1950ms)
          ├── msg3 done at 50ms   (idle for 1950ms)
          ├── msg5 done at 50ms   (idle for 1950ms)
          └── msg4 done at 2000ms
Batch 2: starts at 2000ms ← 4 slots wasted for nearly 2 seconds

Use a semaphore instead. Each message starts processing as soon as a slot opens:

async function processLoop(buffer: Message[]) {
  const semaphore = new Semaphore(5); // 5 concurrent slots
  const inFlight = new Set<Promise<void>>();

  while (true) {
    if (buffer.length === 0) {
      await sleep(50);
      continue;
    }

    const msg = buffer.shift()!;

    const task = (async () => {
      await semaphore.acquire();
      try {
        await processMessage(msg);
      } finally {
        semaphore.release();
      }
    })();

    inFlight.add(task);
    task.finally(() => inFlight.delete(task));
  }
}
Semaphore(5):
  t=0ms:     msg1-msg5 start (5 slots full)
  t=50ms:    msg1,2,3,5 done → msg6,7,8,9 start immediately
  t=100ms:   msg6,7,8,9 done → msg10,11,12,13 start
  t=2000ms:  msg4 finally done → msg14 starts

One slow message under a semaphore penalizes zero other messages. With chunked batching, it penalizes the entire batch. The semaphore acts as a bounded worker pool without synchronized batch boundaries.

A simple Semaphore implementation:

class Semaphore {
  private permits: number;
  private waitQueue: Array<() => void> = [];

  constructor(permits: number) {
    this.permits = permits;
  }

  async acquire(): Promise<void> {
    if (this.permits > 0) {
      this.permits--;
      return;
    }
    return new Promise<void>(resolve => {
      this.waitQueue.push(resolve);
    });
  }

  release(): void {
    const next = this.waitQueue.shift();
    if (next) {
      next();
    } else {
      this.permits++;
    }
  }
}

Pattern 3: Priority-Aware Buffering

If your queue has messages at different priorities (real-time updates vs. bulk backfill), a plain FIFO buffer treats them all equally. You could run separate consumers per priority tier, but then high-priority capacity sits idle when there’s no urgent work.

A tiered buffer gives you shared capacity with priority-aware drain ordering:

type Priority = "high" | "normal" | "low";

const DRAIN_ORDER: Priority[] = ["high", "normal", "low"];

class TieredBuffer<T extends { priority: Priority }> {
  private queues: Map<Priority, T[]>;
  private capacity: Semaphore;
  private hasItems = createSignal();

  constructor(maxSize: number) {
    this.queues = new Map(
      DRAIN_ORDER.map(p => [p, []])
    );
    this.capacity = new Semaphore(maxSize);
  }

  async put(item: T): Promise<void> {
    await this.capacity.acquire(); // blocks when buffer is full
    this.queues.get(item.priority)!.push(item);
    this.hasItems.signal();
  }

  async get(): Promise<T> {
    while (true) {
      for (const priority of DRAIN_ORDER) {
        const queue = this.queues.get(priority)!;
        if (queue.length > 0) {
          return queue.shift()!;
        }
      }
      await this.hasItems.wait();
    }
  }

  release(): void {
    this.capacity.release(); // free slot after processing completes
  }
}

Three things to note:

  1. Shared capacity, not partitioned. All priorities share the same slots. Low-priority work uses the full buffer when high-priority traffic is quiet.
  2. Priority on drain, not insert. get() always checks the highest tier first. Even if 90% of the buffer is low-priority, one high-priority message that arrives gets processed next.
  3. Deferred capacity release. The semaphore is acquired on put() but released after processing via release(), not on get(). If you release on dequeue, the poller pulls messages unboundedly because the buffer always looks like it has space. You end up with thousands of in-flight messages you can’t actually process.

Here’s how it plugs into the decoupled architecture from Pattern 1:

async function startConsumer(queue: MessageQueue) {
  const buffer = new TieredBuffer<Message>(100);
  const semaphore = new Semaphore(15);
  const inFlight = new Set<Promise<void>>();

  async function pollLoop() {
    while (true) {
      const messages = await queue.poll();

      if (!messages.length) {
        await sleep(200);
        continue;
      }

      for (const msg of messages) {
        await buffer.put(msg); // blocks at capacity
      }
    }
  }

  async function processLoop() {
    while (true) {
      const msg = await buffer.get(); // highest priority first

      const task = (async () => {
        await semaphore.acquire();
        try {
          await processMessage(msg);
        } finally {
          semaphore.release();
          buffer.release(); // free buffer slot AFTER processing
        }
      })();

      inFlight.add(task);
      task.finally(() => inFlight.delete(task));
    }
  }

  await Promise.all([pollLoop(), processLoop()]);
}

The Full Picture

[Queue] → [Poller] → [TieredBuffer] → [Processor]
             │         high > normal > low    │
             │         shared capacity     Semaphore(N)
             │         backpressure on put()  │
        independent                    independent
        async loop                     async loop

A few practical notes:

  • Buffer size: 3-6x your concurrency is a good starting point. Too small and the processor starves between polls. Too large and you’re hoarding messages that other consumer instances could be processing.
  • Downstream limits matter. If your handler hits a database, keep semaphore concurrency within your connection pool size. 15 concurrent against a pool of 20 gives you headroom. 50 against the same pool gives you connection exhaustion.
  • Short-poll when checking multiple queues sequentially. If you round-robin across priority tiers, long-polling on each is a trap. Empty queues burn their full wait time before falling through to the next one. Short-polling returns immediately, and your cycle time drops from seconds to milliseconds.
  • Release buffer capacity after processing, not at dequeue. This is the easiest mistake to make with bounded buffers, and it completely defeats backpressure.