Event-based data ingest with VAST, AWS Lambda, and AWS SQS

Prev Next

Programmatically ingesting event-based data is key to refining AI models. By combining AWS Lambda and Simple Queuing Service (SQS), it's simple to ingest event-based data into a VAST on Cloud system.

 

For this example, we'll use an AWS Lambda to listen on an SQS queue for URLs. A Lambda function will download the URL contents and upload them to a VAST cluster with S3. Then, because the VAST Data Platform supports multiprotocol access to all data, we can access those files from an EC2 instance using NFS.

Configure VAST Cluster

On the VAST Cluster, we'll need:

  • A user with S3 access and secret keys.

  • A view that can be accessed as an S3 bucket and over NFS.

 

To start, we need to create a user and provision them S3 access and secret keys. Log in to the VAST UI as a privileged user, such as an admin.

User

Under User Management -> Users, click the Create User button. Set user "ubuntu" and UUID 1000 and click Create.

The screenshot shows an interface for adding a user, with fields to enter the name "ubuntu," set the UID as 1000, and configure permissions such as allowing or disallow bucket creation, deletion, and S3 Superuser roles. Additionally, it provides options to assign groups and select identity policies.

A technical note could be: The user setup depicted is currently in draft mode; however, once all required permissions and group assignments are selected, you can proceed by clicking on the 'Create' button to finalize the user addition.

Edit the user we just created and click "Create new key" to provision them with an access and secret key. It's important that you copy off the secret key now, as it will not be accessible after leaving this screen.

The image shows an interface for updating user information, including user details and access control settings such as permission to create or delete buckets and being S3 Superuser status. The interface also displays access keys with their current status and provides options to copy secret keys if they need to be used elsewhere before closing the window.

View

Now we want to create a View that can be accessed over SMB and NFSv3.

 

Under Element Store -> Views click Create View. Set:

  • Path to /ingestbucket

  • Protocols to NFSv3 and S3 Bucket.

  • S3 bucket name to ingestbucket.

  • Policy name to s3_default_policy.

  • Select "Create new directory for the view."

 

Under S3 settings, set the Bucket Owner to the ubuntu user we created earlier and click Create.

The image displays the "Add View" screen with options to configure a view that supports both NFSv3 and S3 Bucket protocols. The selected tenant is 'default,' and the path specified is '/ingestbucket.' Additionally, there an S3 bucket named 'ing estbucket' has been configured under these settings.

A checkbox option indicates the creation of a new directory for this view is enabled by default.

The image displays an AWS S3 bucket configuration page, where users can set up their bucket owner details and control access settings such as enabling versioning or managing anonymous access permissions.

This page allows administrators to configure key aspects of an S3 storage bucket including the bucket owner's identity, whether S3 versioning is enabled, which users and groups have creator privileges, and whether anonymous access is permitted.

Now we have an S3 bucket and an NFS mount that accesses the same data.

Configuring AWS Resources

AWS SQS

Start by creating an SQS queue for producers to send URLs for the Lambda to process. Use a Standard type, call it WebContentImporter, and accept the rest of the defaults.

The image shows the Amazon SQS queue creation interface, offering users to choose between a "Standard" or "FIFO" queue type based on their application's requirements regarding delivery guarantees and message ordering. The user is instructed that after creating a queue, its cannot be changed in terms of type.

AWS Lambda

Create an AWS Lambda function with the name WebContentImporter and a Python 3.12 runtime. This provides a framework for us to upload the code separately.

The image shows the AWS Lambda "Create function" page where users can define basic parameters such as the function name, runtime (Python 3.12), architecture (x86_64), and permissions before creating the lambda function.

 

Create a Python file called lambda_function.py with the following contents, updating the following global variables for your environment:

  • ENDPOINT_URL - DNS name or IP to your VAST on Cloud data vippool

  • S3_BUCKET_NAME - the name of the S3 bucket on VAST

  • S3_ACCESS_KEY - access key for uploading into the bucket

  • S3_SECRET_KEY - secret key for uploading into the bucket

 

Note: as a best practice, secrets should be stored in a key management system and not hard-coded in the Lambda code.

 

#!/usr/bin/env python3

"""

Read a message from SQS that contains a URL in the message body, whose contents

we want to upload to VAST via S3.

"""

 

import hashlib

import io

import json

from urllib.parse import urlparse

 

from botocore.client import Config

import boto3

import requests

 

ENDPOINT_URL = "http://vast.cluster.address.here"

S3_BUCKET_NAME = "ingestbucket"

 

# As a best-practice, secrets should not be hard-coded but handled by a keystore.

# See https://aws.amazon.com/blogs/compute/securely-retrieving-secrets-with-aws-lambda/

S3_ACCESS_KEY = "xxx"

S3_SECRET_KEY = "yyy"

 

def fetch_url(url):

    """Fetch a URL and return a file-like object of the contents."""

    result = requests.get(url, verify=False)

    return io.BytesIO(result.content)

def upload_file(fileobj, object_name):

    """Upload a file-like object to the VAST S3 bucket."""

    s3_resource = boto3.resource(

        "s3",

        endpoint_url=ENDPOINT_URL,

        aws_access_key_id=S3_ACCESS_KEY,

        aws_secret_access_key=S3_SECRET_KEY,

        aws_session_token=None,

        use_ssl=True,

        verify=False,

        config=Config(s3={"addressing_style": "path"}),

    )

 

    s3_resource.Bucket(S3_BUCKET_NAME).put_object(

        Key=object_name,

        Body=fileobj.read(),

    )

def build_object_name(url):

    """Build an object name from a URL"""

    url_parts = urlparse(url)

    object_name = url_parts.netloc + url_parts.path

    if url_parts.query:

        object_name += "?" + hashlib.md5(url_parts.query.encode("utf-8")).hexdigest()

    return object_name

def lambda_handler(event, context):

    if event:

        batch_item_failures = []

        sqs_batch_response = {}

 

        for record in event["Records"]:

            try:

                url = record["body"]

                upload_file(fetch_url(url), build_object_name(url))

            except Exception as e:

                batch_item_failures.append({"itemIdentifier": record["messageId"]})

 

        sqs_batch_response["batchItemFailures"] = batch_item_failures

        return sqs_batch_response

 

Now we need to download the Python package dependencies and package them up with the code:

$ mkdir ./packages

$ pip3 install --target ./packages requests boto3 botocore

$ cd packages

$ zip -r ../lambda_function.zip .

$ cd ..

$ zip lambda_function.zip lambda_function.py

 

Next we upload the bundle up in the Lambda we created. The following assumes that you have the AWS CLI installed and configured:

$ aws lambda update-function-code --function-name WebContentImporter --zip-file fileb://lambda_function.zip

 

To connect the Lambda to the SQS queue we need to update the IAM role for the Lambda. In IAM, edit the role that was auto-created for Lambda to include the following permissions for the SQS resources:

  • sqs:ReceiveMessage

  • sqs:DeleteMessage

  • sqs:GetQueueAttributes

The screenshot displays an AWS Lambda execution role policy editor, where permissions have been configured to allow specific actions such as creating log groups and streams in Amazon CloudWatch Logs, along with managing messages in an SQS queue named "WebContentImporter".

 

Now we can add an SQS trigger to the Lambda function. It's recommended that you set a non-zero batch window time during testing.

The image shows the AWS Lambda interface where an SQS (Simple Queue Service) trigger is being configured to receive messages from an SQs queue named "WebContentImporter" in the us-east-1 region. The batch size and window settings can be adjusted, with the current configuration set at 30 seconds for gathering records before invoking the function.

This setup allows the Lambda function to process invoke based on messages arriving in the specified SQS queue, with options available to control how many records are sent per batch and the duration within which these records should accumulate before triggering the function execution.

 

Finally, update the Lambda config to:

  • Increase the timeout from 3 seconds to 30 seconds or more

  • Connect the Lambda to the VPC where the VAST on Cloud instance resides

The configuration page displays settings such as timeout, memory, and SnapStart status for an AWS Lambda function. The timeout is set to 0 minutes and 30 seconds, with a memory allocation of 128 MB and no active SnapStart feature.

 

Send a message

Now we're ready to send a message. In the SQS queue, click the Send and Receive messages button, fill out a URL to process, and click Send message.

The screenshot displays the interface for sending and receiving messages in Amazon SQS, where users can input message content into a designated field before clicking "Send message" to enqueue it. The delivery delay setting is adjustable between 0 seconds and 15 minutes from this screen.

 

The Lambda function will pick up this message from the queue, download the URL, and upload its contents to the S3 bucket on VAST.

Accessing the data over NFS

Now that the data is on VAST, it can be accessed using other protocols like NFS. For instance, create an Ubuntu-based EC2 instance in the same VPC as the VAST cluster, mount the view we created initially, and list the contents of the bucket:

 

$ sudo mkdir /mnt/vast

$ sudo mount -t nfs -o vers=3 vast-ip-address:/ingestbucket /mnt/vast

$ ls /mnt/vast

Conclusion

This example demonstrates a scalable framework to programmatically ingest data, perform some simple transformation on it, and ingest that data into a VAST cluster.