Back to Home

Update an attribute in all Dynamo items parallely. A practical use case for the Producer Consumer pattern.

As a developer who contributes to applications using Dynamo, I often find myself in a situation where I’ve to update an attribute or add an attribute to existing Dynamo items (i.e. backfilling) in 1 or more Dynamo partitions. These partitions typically have millions of items and since Dynamo can update atmost 100 items at once, it can be very time consuming (many hours) to backfill an attribute.

Fortunately, there an relatively easy way to alleviate this problem. Inspired by the Producer Consumer pattern, the Typescript script below iterates through every given partition key and creates many readers and writers to update Dynamo items in parallel.

import { DocumentClient } from 'aws-sdk/clients/dynamodb';

const TABLE_NAME = "MainTable";
const PKS = [
  'email_PK',
  'user_PK',
  'label_PK`'
];
const field = 'touchedAt';
const value = new Date().toISOString();

const WRITERS = 26 * 10;
const ITEM_BATCH_SIZE = 100;
const QUEUE_THRESHOLD_SIZE = 100000;

const ALPHABETS = Array.from(Array(26)).map((_, i) =>
  String.fromCharCode(97 + i)
);

class ProducerConsumerSystem<T> {
  private producers: AsyncGenerator<T, void, unknown>[] = [];

  private producersDone = false;

  private consumers: ((items: T[]) => Promise<void>)[] = [];

  private batchSize: number;

  private queue: T[] = [];

  private queueThreshold: number;

  constructor(batchSize = 25, queueThreshold = 100) {
    this.batchSize = batchSize;
    this.queueThreshold = queueThreshold;
  }

  addProducer(producer: AsyncGenerator<T, void, unknown>) {
    this.producers.push(producer);
  }

  addConsumer(consumer: (items: T[]) => Promise<void>) {
    this.consumers.push(consumer);
  }

  async run() {
    const productions = this.producers.map((producer) =>
      this.produceWith(producer)
    );
    const consumptions = this.consumers.map((consumer) =>
      this.consumeWith(consumer)
    );
    await Promise.all(productions);
    this.producersDone = true;
    await Promise.all(consumptions);
  }

  private async produceWith(producer: AsyncGenerator<T, void, unknown>) {
    for await (const items of producer) {
      while (this.queue.length >= this.queueThreshold) {
        await new Promise((res) => setTimeout(res, 100));
      }
      this.queue.push(...(Array.isArray(items) ? items : [items]));
    }
  }

  private async consumeWith(consumer: (items: T[]) => Promise<void>) {
    while (this.queue.length > 0 || !this.producersDone) {
      if (this.queue.length === 0) {
        await new Promise((res) => setTimeout(res, 50));
        continue;
      }

      const batch = this.queue.splice(0, this.batchSize);
      await consumer(batch);
    }
  }
}

async function batchTouchItems(
  ddb: DocumentClient,
  tableName: string,
  items: DocumentClient.AttributeMap[]
) {
  if (items.length === 0) return;
  const batchSize = items.length;
  const { pk } = items[0];
  const itemsUpdates = items.map((item) => {
    return {
      Update: {
        TableName: tableName,
        Key: {
          pk,
          sk: item.sk,
        },
        UpdateExpression: 'SET #field = :value',
        ExpressionAttributeNames: {
          '#field': field,
        },
        ExpressionAttributeValues: {
          ':value': value,
        },
      },
    };
  });
  try {
    console.log(`Updated ${batchSize} ${pk} items`);
    await ddb
      .transactWrite({
        TransactItems: itemsUpdates,
      })
      .promise();
    console.log(`Updated ${batchSize} ${pk} items`);
  } catch (e) {
    if (batchSize === 1) {
      const item = items[0];
      console.error(`Failed to update item ${pk} ${item.sk}:`, e);
      return;
    }
    console.log(
      `Failed to update a batch of ${batchSize} ${pk} items. Splitting the batch into two halves and retrying...`
    );
    const midIndex = Math.ceil(items.length / 2);
    await batchTouchItems(ddb, tableName, items.slice(0, midIndex));
    await batchTouchItems(ddb, tableName, items.slice(midIndex));
  }
}

async function* readItems(
  ddb: DocumentClient,
  tableName: string,
  pk: string,
  queryParams?: Partial<DocumentClient.QueryInput>
): AsyncGenerator<DocumentClient.AttributeMap[], boolean> {
  let exclusiveStartKey: DocumentClient.Key | undefined;

  while (true) {
    const res: DocumentClient.QueryOutput = await ddb
      .query({
        TableName: tableName,
        // KeyConditionExpression: '#pk = :pk',
        // ExpressionAttributeNames: { '#pk': 'pk', '#field': field },
        // ExpressionAttributeValues: { ':pk': pk, ':value': value },
        ExclusiveStartKey: exclusiveStartKey,
        ...queryParams,
      })
      .promise();

    const items = (res.Items || []).filter((item) => pks.includes(item.pk));

    console.log(`Read ${items.length} ${pk} items`);

    if (items.length === 0) break;

    yield items;

    if (res.LastEvaluatedKey) {
      exclusiveStartKey = res.LastEvaluatedKey;
    } else {
      break;
    }
  }

  return true;
}

async function touchPartition(
  ddb: DocumentClient,
  tableName: string,
  pk: string
) {
  const producerConsumerSystem = new ProducerConsumerSystem<
    DocumentClient.AttributeMap[]
  >(ITEM_BATCH_SIZE, QUEUE_THRESHOLD_SIZE);
  ALPHABETS.forEach((alphabet) => {
    const startAlphabet = alphabet;
    const endAlphabet =
      alphabet === 'z' ? 'z' : ALPHABETS[ALPHABETS.indexOf(alphabet) + 1];
    const readItemsStartingWithAlphabet = async function* () {
      yield* readItems(ddb, tableName, pk, {
        KeyConditionExpression:
          '#pk = :pk AND #sk BETWEEN :startAlphabet AND :endAlphabet',
        ExpressionAttributeNames: { '#pk': 'pk', '#sk': 'sk' },
        ExpressionAttributeValues: {
          ':pk': pk,
          ':startAlphabet': startAlphabet,
          ':endAlphabet': endAlphabet,
        },
      });
    };
    producerConsumerSystem.addProducer(readItemsStartingWithAlphabet());
  });
  Array(WRITERS)
    .fill(0)
    .forEach(() =>
      producerConsumerSystem.addConsumer(
        (batch: DocumentClient.AttributeMap[]) =>
          batchTouchItems(ddb, tableName, batch)
      )
    );
  return producerConsumerSystem.run();
}

async function touchDdbPartitions(ddb, tableName: string, pks: string[]) {
  for (const pk of pks) {
    await touchPartition(ddb, tableName, pk);
  }
}

const ddb = new AWS.DynamoDB.DocumentClient();
touchDdbPartitions(ddb, TABLE_NAME, PKS)

The sort keys in the applications I typically contribute to are unique alphabetical IDs. To parallelize reads, the script starts 26 readers for every partition (and configures every reader to read items starting with a specific alphabet). Since a writer can only update 100 items at once, the script also starts a configurable number of writers (10 in my case) for every reader.

If the application you’re contributing to has a different type of sort key (say a createdAt timestamp), you can considering dividing a partition between readers by day or month (or opt for better division strategies if any).

The concurrency enabled by the Producer Consumer pattern reduces the script’s execution time from hours to minutes. The script saves me hours every month. I hope you find it useful as well.

Built with Hugo & Notion. Source code is available at GitHub.