Your AWS Lambda stock data pipeline just got an upgrade! After using my automated collector for a while, I found a two crucial areas for improvement. Dive in to see how I refined it for even better performance and reliability

Before you go and read this article, you need to read the last post where I showed how to make this simple but powerful tool. Its OK, I can wait.

Now that you have read the post and looked at the source code, can you guess what I changed? Before you read any more, put your guess in the comments and come back.

How Much to Download

I looked at my list of symbols and found that it was missing about half of the symbols I wanted to download so I increased the number from 5857 to 11308 stocks. This immediately perked my “something can go wrong” sense as lambdas are known for the lack of resources they contain. The easiest way to fix that is to download the stock data in batches so there will be no memory issues. I made the batch size configurable via a environment variable. After looking at my HISTORY_DAYS variable, I decided to go back five days instead of seven because I the original intent is to go back a market week and not a calendar week.

Terraform Code

lambda.tf changes

resource "aws_lambda_function" "extract_market_data_aws_lambda" {
  depends_on = [aws_s3_object.s3_object_upload]
  function_name = var.lambda_function
  role          = aws_iam_role.iam_for_lambda.arn
  handler       = var.lambda_handler
  source_code_hash = filebase64sha256(var.file_location)
  s3_bucket     = aws_s3_bucket.code_bucket.bucket
  s3_key        = var.lambda_filename
  runtime       = var.runtime
  timeout       = var.timeout
  memory_size = 512
  environment {
    variables = {
      TZ = var.timezone
      DATA_BUCKET = aws_s3_bucket.data_bucket.bucket
      SYMBOLS = var.symbols_filename
      HISTORY_DAYS = var.history_days
      SYMBOLS_BATCH_SIZE = var.symbols_batch_size
    }
  }
}

The only change here is the addition of the SYMBOLS_BATCH_SIZE variable and the var.symbols_batch_size variables.

variables.tf changes

variable "history_days" {
  default = 5
}



variable "symbols_batch_size" {
  default = 6000
}

No surprises here, just variables and defaults.

Java Code

application.yml changes

data:
  bucket:
    name: ${DATA_BUCKET}
  symbols:
    file: ${SYMBOLS:symbols.txt}
    batch-size: ${SYMBOLS_BATCH_SIZE:6000}
history:
  days: ${HISTORY_DAYS:5}

I am showing the configuration mostly for reference. Again no surprises here, I set the data.symbols.batch-size to a default of 6000 and allow it to be set via an environment variable.

StockPriceServiceImpl.java changes

package com.darylmathison.market.service.impl;

import com.darylmathison.market.service.StockPriceService;
import com.darylmathison.market.model.StorageStockBar;
import com.darylmathison.market.dto.PriceDataDTO;
import com.darylmathison.market.service.S3Service;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.logging.Logger;
import java.util.logging.Level;
import java.util.zip.GZIPOutputStream;

/**
 * Implementation of StockPriceService for downloading and storing stock price data.
 */
@Service
public class StockPriceServiceImpl implements StockPriceService {

    

    @Value("${data.symbols.batch-size:3000}")
    private int symbolsBatchSize;

    

    /**
     * Downloads price data for configured symbols and stores it in S3.
     *
     * @return Number of price records processed
     */
    @Override
    public int getPriceData() {
        try {
            logger.info("Starting price data download process");

            // Get symbols from S3
            List<String> symbols = s3Service.fetchList(dataBucketName, symbolsFileKey);
            logger.info("Fetched " + symbols.size() + " symbols from S3");

            // Calculate date range
            LocalDate endDate = LocalDate.now();
            LocalDate startDate = endDate.minusDays(historyDays);
            logger.info("Downloading price data from " + startDate + " to " + endDate);

            // Process symbols in batches
            int totalRecords = 0;
            for (int i = 0; i < symbols.size(); i += symbolsBatchSize) {
                int endIndex = Math.min(i + symbolsBatchSize, symbols.size());
                List<String> symbolsBatch = symbols.subList(i, endIndex);
                int batchNumber = i / symbolsBatchSize + 1;
                logger.info("Processing batch " + batchNumber + 
                           " with " + symbolsBatch.size() + " symbols (from index " + i + " to " + (endIndex - 1) + ")");

                // Get price data for this batch
                List<StorageStockBar> batchPriceData = priceDataDTO.getPriceData(symbolsBatch, startDate, endDate);
                logger.info("Retrieved " + batchPriceData.size() + " price records for current batch");

                // Convert batch data to compressed CSV
                byte[] compressedData = generateCompressedCSV(batchPriceData);
                logger.info("Generated compressed CSV data for batch " + batchNumber + ": " + compressedData.length + " bytes");

                // Create S3 key with today's date and batch number
                String key = generateS3Key(endDate, batchNumber);

                // Upload batch to S3
                s3Service.putObject(dataBucketName, key, compressedData);
                logger.info("Successfully uploaded batch " + batchNumber + " price data to S3: " + key);

                // Add to the total count
                totalRecords += batchPriceData.size();
            }

            logger.info("Total retrieved price records: " + totalRecords);

            return totalRecords;
        } catch (Exception e) {
            logger.log(Level.SEVERE, "Failed to process price data: " + e.getMessage(), e);
            throw new RuntimeException("Failed to process price data: " + e.getMessage(), e);
        }
    }

    
}

The biggest change is the old school for loop to calculate the indexes to use on the symbols list. I thought about using a generator with a IntStream but it would have been the same code but less readable. Simple code is easier to read, debug and more reliable in the end. No one cares about clever code on a Saturday morning incident.

Photo by Pavel Danilyuk on Pexels.com

Security Concerns

Now this tool is secureish(technical term?) because of the lack of user input it needs. I do need to login to AWS from time to time but that is not this tool’s concern. Its concern is downloading stock prices every Friday at 5PM US/Eastern. Keeping to its concerns, it does need to access Alpaca’s databases and that requires an API key with matching secret key. How this tool gets these values needed help from the last version. I used environment variables to get those values and while that’s tempting, it is not the best way to get them. The best way is to take advantage of AWS SecretManager. Paired with terraform, this becomes a easy way to grab some secrets.

Terraform Code

secrets.tf

resource "aws_secretsmanager_secret" "alpaca_api_key" {
  name        = "alpaca_api_key"
  description = "Secret for my Alpaca Api Key"

  tags = {
    Environment = "staging"
    Project     = "download prices"
  }
}

resource "aws_secretsmanager_secret_version" "alpaca_api_key_version" {
  secret_id = aws_secretsmanager_secret.alpaca_api_key.id
  secret_string = jsonencode({
    apiKey = var.alpacaApiKey
    secretKey = var.alpacaSecretKey
  })
}

resource "aws_iam_policy" "alpaca_secrets_read_policy" {
  name        = "alpaca_secrets_read_policy"
  description = "Allows reading of the alpaca api secrets"

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect   = "Allow"
        Action   = [
          "secretsmanager:GetSecretValue",
          "secretsmanager:DescribeSecret",
        ]
        Resource = [
          aws_secretsmanager_secret.alpaca_api_key.arn
        ]
      },
    ]
  })
}

The actual secret creation is in the first two resource definitions. After this code was written, I talked to an actual Terraform expert Len Jaffe about setting up secrets. He said that the creation and setting of the secret should happen out of band. This means a data block is defined to reference the secret. I can do this another time. Notice how I put the apiKey and secretKey in a JSON object. The Java code to decode these values is easy and it keeps the all the related secrets in one lookup. Considering that AWS SecretManager charges you by the retrieval, one call is preferable.

The third resource block defines the policy needed to read the secret. From a modularity and security perspectives, it made more sense to keep this to just the alpaca secrets.

lambda.tf changes

   
  resource "aws_iam_role_policy_attachment" "lambda_secrets_access" {
  role       = aws_iam_role.iam_for_lambda.name
  policy_arn = aws_iam_policy.alpaca_secrets_read_policy.arn
}
  
  

resource "aws_lambda_function" "extract_market_data_aws_lambda" {
  depends_on = [aws_s3_object.s3_object_upload]
  function_name = var.lambda_function
  role          = aws_iam_role.iam_for_lambda.arn
  handler       = var.lambda_handler
  source_code_hash = filebase64sha256(var.file_location)
  s3_bucket     = aws_s3_bucket.code_bucket.bucket
  s3_key        = var.lambda_filename
  runtime       = var.runtime
  timeout       = var.timeout
  memory_size = 512
  environment {
    variables = {
      TZ = var.timezone
      DATA_BUCKET = aws_s3_bucket.data_bucket.bucket
      SYMBOLS = var.symbols_filename
      HISTORY_DAYS = var.history_days
      ALPACA_SECRET_NAME = aws_secretsmanager_secret.alpaca_api_key.name
      SYMBOLS_BATCH_SIZE = var.symbols_batch_size
    }
  }
}
  
 

So you see I have added the read policy to the lambda role and removed the ALPACA_API_KEY and ALPACA_SECRET_KEY and replaced it with ALPACA_SECRET_NAME. Both these changes should seem straight forward. Adding the policy allows the lambda to read the secret and the lambda needs to know the name of the secret to retrieve.

Java Code Changes

pom.xml changes

    <dependency>
        <groupId>software.amazon.awssdk</groupId>
        <artifactId>secretsmanager</artifactId>
        <version>${awssdk.version}</version> </dependency>
    <dependency>
        <groupId>software.amazon.awssdk</groupId>
        <artifactId>auth</artifactId>
        <version>${awssdk.version}</version>
    </dependency>
    <dependency>
        <groupId>software.amazon.awssdk</groupId>
        <artifactId>regions</artifactId>
        <version>${awssdk.version}</version>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.17.2</version>
    </dependency>

Here are the libraries I used to pull in the keys. Notice the jackson-databind addition? Yes, it is that easy.

ApiKey.java

package com.darylmathison.market.model;

@lombok.Data
public class ApiKeyPair {
  String apiKey;
  String secretKey;
}

Yes, I created a data class named ApiKeyPair. Notice the members are named like how I defined the JSON object in the secrets.tf file. For those who have use Jackson before, you already know what is going to happen. For those who have not used Jackson before, keep reading. This project also uses Lombok as one can tell. If you want to know more about these projects from my point of view, mention it in the comments.

SecretsServiceImpl.java

package com.darylmathison.market.service.impl;

import com.darylmathison.market.model.ApiKeyPair;
import com.darylmathison.market.service.SecretsService;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient;
import software.amazon.awssdk.services.secretsmanager.model.GetSecretValueRequest;
import software.amazon.awssdk.services.secretsmanager.model.GetSecretValueResponse;
import software.amazon.awssdk.services.secretsmanager.model.SecretsManagerException;
import com.fasterxml.jackson.databind.ObjectMapper;

@Service
public class SecretsServiceImpl implements SecretsService {

  @Value("${aws.region}")
  private String awsRegion;

  @Override
  public ApiKeyPair getSecretApiKeyPair(String secretName) {
    String jsonString = getSecret(secretName);
      // If your secret is JSON, you can parse it
     ObjectMapper mapper = new ObjectMapper();
     ApiKeyPair apiKey;
     try {
         apiKey = mapper.readValue(jsonString, ApiKeyPair.class);
     } catch (JsonProcessingException e) {
       System.err.println("Error parsing secret JSON: " + e.getMessage());
       throw new RuntimeException("Failed to parse secret JSON: " + e.getMessage(), e);
     }
     return apiKey;
  }

  @Override
  public String getSecret(String secretName) {
    GetSecretValueRequest getSecretValueRequest = GetSecretValueRequest.builder()
                .secretId(secretName)
                .build();

    try(SecretsManagerClient secretsClient = buildSecretsManagerClient()) {
        GetSecretValueResponse getSecretValueResponse = secretsClient.getSecretValue(getSecretValueRequest);
        return getSecretValueResponse.secretString();
    } catch (SecretsManagerException e) {
        System.err.println("Error retrieving secret: " + e.awsErrorDetails().errorMessage());
        throw new RuntimeException("Failed to retrieve secret: " + e.awsErrorDetails().errorMessage(), e);
    }
  }

  protected SecretsManagerClient buildSecretsManagerClient() {
    return SecretsManagerClient.builder().region(Region.of( awsRegion)).build();
  }

}

This is my simple SecretsService class. It is syntactic sugar for accessing AWS Secrets Manager. I decided to wrap any exceptions and rethrow them because if I cannot get to my secrets, I have big issues. In my experience, that is the case because it means that I cannot get to needed services like databases or third party data providers (Alpaca.markets).

The part I have been hinting at is located in getSecretApiKeyPair. I create a ObjectMapper and then call mapper.readValue(jsonString, ApiKeyPair.class) to get an instantiated ApiKeyPair from the jsonString retrieved from the Secrets Manager. The getSecret call is a basic AWS client call. A request is created, specifically a GetSecretValueRequest. A client, specifically a SecretsManagerClient is created and is passed the request. The response is queried to get the requested secret data. I could make the SecretsManagerClient stay around longer but I do not feel the need. We are working in a lambda and it is only called once at the moment. If I had two or more calls to the secrets manager, I would keep the client up until the service was deconstructed.

SpringConfig.java changes

package com.darylmathison.market.config;

import com.darylmathison.market.model.ApiKeyPair;
import com.darylmathison.market.service.SecretsService;
import net.jacobpeterson.alpaca.AlpacaAPI;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;

@Configuration
@ComponentScan(basePackages = "com.darylmathison.market") // Scans components in this package
public class SpringConfig {

  @Value("${aws.region}")
  private String awsRegion;

  @Value("${alpaca.secret-name}")
  private String alpacaSecretName;

  @Bean
  public AlpacaAPI alpacaAPI(SecretsService secretsService) {
    ApiKeyPair alpacaApiKeyPair = secretsService.getSecretApiKeyPair(alpacaSecretName);
    return new AlpacaAPI(alpacaApiKeyPair.getApiKey(), alpacaApiKeyPair.getSecretKey());
  }

 
}

The big changes here are the removal of the alpacaApiKey and alpacaSecretKey. They were replaced the alpacaSecretName. I then passed in the SecretsService to the alpacaAPI call and used it to retrieved the ApiKeyPair. All of this is straightforward Spring configuration magic.

Possible Improvements

From a performance perspective, I can’t think of a reason to change anything. It currently takes two minutes instead of one minute to download and store all of the data. That is 100% increase in time but it is well inside my performance requirements so it stays as of now. If I did need to reduce the amount of time, I could play with the batch size and memory. Neither of those values requires code changes so that is a win. From a security standpoint I could follow Len’s advice and remove the creation of the secret and just refer to it in the terraform files. That would be a simple fix and can be done in the future but I am happy with it now.

Conclusion

After seeing some issues with my lambda, I decided to improve the lambda. The first improvement was to increase the amount of data to download. I added the ability to pull the data in batches to prevent resource exhaustion. The other and more important change was taking advantage of AWS Secrets Manager to keep my api key safe from prying eyes.

To see the code for yourself, please look here. To see the full set of changes include unit tests, please look at the pull request.

Help Me Help You

Just by reading, liking and commenting, you are already supporting me in an important way. The link below is for those who wish to support me at the next level.

Support My Work