New storage system for Apify request queue: what you need to know

Find out about the new Apify storage system for request queue and learn about new features and implementation details such as batch operations or locking mechanism.

Content

Apify recently released a new storage system for the Apify request queue. The new implementation unlocks new use cases that were not previously possible and introduces new features. In this post, we look at a brief history of the storage system, new features, and implementation details.

Web scraping queue

Apify’s request queue is a cloud-based storage system designed for web scraping queues. It enables developers to enqueue and retrieve requests, i.e., URLs with an HTTP method, payload, and other parameters. They prove essential not only in web crawling scenarios but also for other use cases requiring the management of a large number of URLs and the continuous addition of new links.

The storage system for request queues accommodates both breadth-first and depth-first crawling strategies, along with the inclusion of custom data attributes. This system enables you to check if certain URLs have already been encountered, add new URLs to the queue, and retrieve the next set of URLs for processing.

Limitations of the previous storage system

Since the initial release of the request queue, our users have faced some limitations and missing features of the storage system. This forced them to create workarounds or use other storage types available on the Apify platform. During this period, we collected feature requests from customers, and we wanted to support them out of the box.

Main limitations:

  • Throughput issues: Due to limited throughput in operations and lack of batch operations, the initialization of large-scale scraping jobs was challenging anytime there was a need to pre-enqueue hundreds of thousands of requests at once at the beginning of the job.
  • Fixed data retention policies: Users had no control over the data retention of requests in the queue. Each request added to the queue had an expiration date set to the user's currently applied data retention period, making incremental scraping very difficult.
  • Concurrency limitations: The original system didn't include a lock on currently processed requests. Scraping one request queue from multiple Actor runs could lead to duplicate results, and users needed to handle this issue in post-processing.

Introduction to the new storage system

The enhancements opened by the new implementation not only address these limitations but also introduce capabilities that support more complex and demanding use cases. It is worth mentioning that the Apify tooling, including Crawlee and Apify SDK for JavaScript, incorporates all these features, enabling users to leverage them effortlessly without extra configuration.

Distributed scraping

The system now incorporates a locking mechanism that prevents multiple clients from processing the same request simultaneously. This logic was previously located in the client library, so only a single client could process a single queue. This feature is crucial for distributed scraping tasks, where numerous instances are processing the queue concurrently. By locking requests during processing, the system ensures data integrity, exactly-once processing, and operational reliability.

In the following code example, we demonstrate how we can use locking mechanisms using the Apify Client for JavaScript. This feature is integrated into Crawlee, requiring minimal extra setup. For more details, refer to the Crawlee documentation.

import { Actor, ApifyClient } from 'apify';

await Actor.init();

const client = new ApifyClient({
    token: 'MY-APIFY-TOKEN',
});

// Creates a new request queue.
const requestQueue = await client.requestQueues().getOrCreate('example-queue');

// Creates two clients with different keys for the same request queue.
const requestQueueClientOne = client.requestQueue(requestQueue.id, { clientKey: 'requestqueueone' });
const requestQueueClientTwo = client.requestQueue(requestQueue.id, { clientKey: 'requestqueuetwo' });

// Adds multiple requests to the queue.
await requestQueueClientOne.batchAddRequests([
    { url: 'http://example.com/foo', uniqueKey: 'http://example.com/foo', method: 'GET' },
    { url: 'http://example.com/bar', uniqueKey: 'http://example.com/bar', method: 'GET' },
    { url: 'http://example.com/baz', uniqueKey: 'http://example.com/baz', method: 'GET' },
    { url: 'http://example.com/qux', uniqueKey: 'http://example.com/qux', method: 'GET' },
]);

// Locks the first two requests at the head of the queue.
const processingRequestsClientOne = await requestQueueClientOne.listAndLockHead({
    limit: 2,
    lockSecs: 60,
});

// Other clients cannot list and lock these requests; the listAndLockHead call returns other requests from the queue.
const processingRequestsClientTwo = await requestQueueClientTwo.listAndLockHead({
    limit: 2,
    lockSecs: 60,
});

// Checks when the lock will expire. The locked request will have a lockExpiresAt attribute.
const theFirstRequestLockedByClientOne = processingRequestsClientOne.items[0];
const requestLockedByClientOne = await requestQueueClientOne.getRequest(theFirstRequestLockedByClientOne.id);
console.log(`Request locked until ${requestLockedByClientOne?.lockExpiresAt}`);

// Other clients cannot modify the lock; attempting to do so will throw an error.
try {
    await requestQueueClientTwo.prolongRequestLock(theFirstRequestLockedByClientOne.id, { lockSecs: 60 });
} catch (err) {
    // This will throw an error.
}

// Prolongs the lock of the first request or unlocks it.
await requestQueueClientOne.prolongRequestLock(theFirstRequestLockedByClientOne.id, { lockSecs: 60 });
await requestQueueClientOne.deleteRequestLock(theFirstRequestLockedByClientOne.id);

// Cleans up the queue.
await requestQueueClientOne.delete();

await Actor.exit();

An example code of using requests locking

Batch operations

To reduce network latency and enhance throughput, the new system supports batch operations. This allows multiple requests to be enqueued or dequeued in a single operation, streamlining the process and significantly reducing the time spent on network communication.

You can use batch operations with the Apify API, as well as with the Apify API client for JavaScript and Python. Here is a short example of how to use batch operation with JavaScript ApifyClient.

const client = new ApifyClient();

const requestQueueClient = client.requestQueue('my-queue-id');

// Add multiple requests to the queue
await requestQueueClient.batchAddRequests([
    { url: 'http://example.com/foo', uniqueKey: 'http://example.com/foo', method: 'GET' },
    { url: 'http://example.com/bar', uniqueKey: 'http://example.com/bar', method: 'GET' },
]);

// Remove multiple requests from the queue
await requestQueueClient.batchDeleteRequests([
    { uniqueKey: 'http://example.com/foo' },
    { uniqueKey: 'http://example.com/bar' },
]);

An example code of using batch operations

Request lifecycle made right

Previously, each request had a time to live (TTL) set based on user data retention. Thanks to the new implementation, requests reflect the retention of the queue and remain available for as long as the queue exists. The system offers indefinite retention for requests in named queues. This facilitates incremental crawling, where you can append new URLs to the queue and resume from where you stopped in subsequent Actor runs. If you’re scraping an e-commerce website with thousands of products, incremental scraping allows you to scrape only the products added since the last product discovery.

In addition to this, you can list all the previously handled requests and un-handle some of them to be re-scraped. This enables you to refresh certain data.

In the following code example, we demonstrate how to use the Apify SDK and Crawlee to create an incremental crawler. By running this Actor multiple times, you can incrementally crawl the source website and only save pages added since the last crawl, as reusing a single request queue ensures that only URLs not yet visited are processed.

import { Actor } from 'apify';
import { CheerioCrawler, Dataset } from 'crawlee';

interface Input {
    startUrls: string[];
    persistRquestQueueName: string;
}

await Actor.init();

// Structure of input is defined in input_schema.json
const {
    startUrls = ['https://docs.apify.com/'],
    persistRequestQueueName = 'persist-request-queue',
} = await Actor.getInput() ?? {} as Input;

// Open or create request queue for incremental scrape.
// By opening same request queue, the crawler will continue where it left off and skips already visited URLs.
const requestQueue = await Actor.openRequestQueue(persistRequestQueueName);

const crawler = new CheerioCrawler({
    requestQueue, // Pass incremental request queue to the crawler.
    requestHandler: async ({ enqueueLinks, request, $, log }) => {
        log.info('Enqueueing new URLs');
        await enqueueLinks();

        // Extract title from the page.
        const title = $('title').text();
        log.info(`New page with ${title}`, { url: request.loadedUrl });

        // Save the URL and title of the loaded page to the output dataset.
        await Dataset.pushData({ url: request.loadedUrl, title });
    },
});

await crawler.run(startUrls);

await Actor.exit();

An example implementation of an incremental crawler

If you are interested in examples of how to use the new request queue features, you can find them in the Apify documentation or read the Apify Academy tutorial.

Implementation

Here are some numbers to give you an idea of the kind of scale the new request queue storage should be able to handle:

  • Request queue API load: 80,000-500,000 API requests are written into request queues every minute.
  • Request queue storage: about 500 million requests are stored every month.

We needed to pick a storage system capable of handling storage on this scale with nearly unlimited capacity. We found that the DynamoDB NoSQL database from AWS is the most scalable and cost-effective solution for this use case. We’ve built a lot of custom code around DynamoDB to deliver the cost-effective and scalable solution we want to provide to Apify customers.

Here are the two main ones:

1) Cost-effectiveness

There are two billing mechanisms for how AWS charges for storage and requests. On-demand capacity mode (DynamoDB charges you for the data reads and writes your application performs on your tables) and provisioned capacity mode (you specify the number of reads and writes per second to DynamoDB you expect your application to require, and DynamoDB charges you a flat rate for that capacity).

We came to the conclusion that none of these billing options suits the whole lifecycle of the requests in the request queue. As is the nature of scraping jobs, most traffic on the request queue happens right after it is created, so there must be the most available capacity to do it. On the other hand, there are incremental scrapers that run in a specific period, in most cases every day, and utilize the request queue on a periodical basis.

With this in mind, we found out how to make this efficient by creating a DynamoDB table for every week. This way, we can easily change the capacity of specific DynamoDB tables based on predictable load. This allows us to set a high provisioned capacity for the last two weeks’ table, where the high load is happening. The older tables are then switched to on-demand capacity mode. Thanks to this setup, we can handle high loads during the early days of request queue creation and save costs on DynamoDB usage for old queues, where the load is dramatically lower.

dynamodb-usage.png
Lifecycle of DynamoDB table write units

2) Locking mechanism

DynamoDB is a serverless, NoSQL database. Like other databases, each item in the database has its own primary key. The primary key allows you to access a specific item in the database quickly. If you need to retrieve items by other attributes or sort them, you need to create an additional index. The index is a data structure that contains a subset of attributes from the database, along with an alternate key to support query operations. Each new index increases the costs of every write operation into the table, because you need to perform the write in all the indexes as well as the main table, so the cost of operation multiplies. In our request queue implementation, we need only one index from the beginning, which was internally made on attributes queueId and orderNo. The orderNo attribute initially holds the time of insertion of the request in milliseconds elapsed since the epoch, and queueId holds the ID of the specific request queue the item belongs to. When a user wants to mark a request as handled, the orderNo is set to null, ensuring that the request will not appear at the head of the queue.

This way, we can quickly sort the requests in the queue by insertion time, and support operation to get the head of the request queue (=returns the oldest request in the queue). The index is handy for putting requests on the head of the queue. By default, all newly created requests are put to the tail of the queue, but in the case of depth-first crawling strategies, requests need to be put to the head of the queue. This case is handled by multiplying the orderNo timestamp by -1, which sorts the request on the head of the queue. If we consider Date.now() as the current timestamp, the attribute can be in the interval [-Date.now(), Date.now()].

When considering a new index for locked items, we faced a cost issue, because creating a new index would increase the cost for all operations by half. Because of this, we decided to utilize the current index, and implement application logic that handles the locking mechanism. We introduced new intervals that help us distinguish between locked and unlocked items and even handle the proper expiration of the lock. There are three intervals of orderNo for locked and unlocked items:

  • [-Date.now(), Date.now()] - These items are not locked.
  • (Date.now(), +infinity) - These items are locked.
  • (-infinity, -Date.now()) - These items are locked and were added to the head of the queue.

The application logic, based on the current timestamp, decides whether the request is locked or not and behaves accordingly.

async lockRequest(requestToLock, { tableName, queueId, clientKey }) {
    const { id, orderNo } = requestToLock;
    const lockTo = Date.now() + (lockSecs * 1000);

    let updateResult;
    try {
        updateResult = await dynamoDbClient.update({
            TableName: tableName,
            Key: { id, queueId },
            UpdateExpression: 'SET orderNo = :lockTo, lockByClient = :clientKey',
            ConditionExpression: 'orderNo = :previousOrderNo',
            ExpressionAttributeValues: {
                ':lockTo': (orderNo < 0) ? -lockTo : lockTo,
                ':previousOrderNo': orderNo,
                ':clientKey': clientKey,
            },
            ReturnConsumedCapacity: 'TOTAL',
        });
        requestToLock.orderNo = lockTo;
    } catch (err) {
        if (err.name === 'ConditionalCheckFailedException') {
            // Request was already locked by another process.
            return { wasLocked: false, isLockCollision: true };
        }
        throw err;
    }

    return { request: requestToLock, wasLocked: true, lockResult: updateResult };
}

An example implementation of locking an item in DynamoDB

Using this implementation, we were able to reduce the cost of the locking mechanism and keep the write operation to the request queue at the same price.

While the overview provided here outlines the core aspects of our new request queue system, it's important to note that the actual implementation is far more complicated. The system also leverages other technologies, such as Redis for cache management and Amazon S3 for scalable and cheaper storage of large objects.

Customer impact

The latest updates change how the storage and Apify tools, especially Crawlee, use the request queue. The new version of Crawlee causes the system to perform twice as many write operations on DynamoDB, and only half as many read operations. This is because the system now locks requests on the server, which increases write operations.

For Apify customers, the effect of these changes depends on which Apify solution they use. However, it's good to know that for most users, the cost of the request queue is just a small part of the total cost, so this change won’t really affect their expenses.

Users who created their own Actor based on Crawlee can see cost changes on the Apify platform after updating to the new Crawlee release.

For users of ready-made Actors from Apify Store, the changes depend on the developers of these Actors. If the change in cost is significant, the developer should inform the affected user.

These changes should not affect the rest of the users except the ones already mentioned, but if you have any issues with the new request queue system, let Apify support know.

Jakub Drobník
Jakub Drobník
Full-stack developer from the early days at Apify, involved in almost all Apify platform projects since then. Currently switching between work and travelling around the globe on a biweekly basis.

Get started now

Step up your web scraping and automation