The Problem

I have been involved with the stock markets ever since I was a child. While my dad spared me from the down and dirty details, he was open about investing in the stock market. That early exposure gave me a natural faciniation of the markets. There are many places to look at graphs but I would like to make my own graphs and that requires data. The easiest way to have my own database is to collect data from a reliable source.

Designing a Cloud Native ETL Process

When designing a ETL process, one must account for

  • What data is being gathered?
  • Why is the data being used?
  • Where will the data be stored?
  • How to gather the data?

What data is being gathered?

This will be stock market data in 15 minute intervals from the prior week.

Why is the data being used?

This data will be used for historical stock analysis. While I know this does not guarantee future performance, I have found it is a good predictor.

Where will the data be stored?

I can put the data directly into a database considering I run a PostgreSQL instance on my laptop. However, I am not always doing stock analysis so keeping it outside of a database is good too. Considering historical data is not known for changing much, as long the data is available and in a known format, that is good enough.

Implementation

Tech Stack

I will be using Java/Spring for the codebase. Terraform will be the IAC component as it is used normally by enterprise operations.

Cloud Native

I chose using a cloud native solution because it allows me to do other things on my laptop besides wonder if my download process is running and if it is safe to restart my laptop. I will be using AWS because I am already a customer of that cloud provider and I am familiar with the services I will be using.

Final Data Form

Market data for me tends to be on the cool side until I need it. For that reason, I will be taking the data from my brokerage and converting it to a csv.gz file. This file will be stored on a AWS S3 Bucket until I need it.

Process flow

The lambda will be triggered every Friday at 5pm US/Eastern. It will upload a list of symbols contained in the file named “symbols.txt” contained in one of my data bucket. That list of symbols will form a request of the last 5 days (M-F) of OHLC (Open High Low Close) prices at 15 minute intervals. After the request is fulfilled, the data will be converted to csv files. Each row will be form of symbol, timestamp, open, high, low, close, volume. After the csv transformation, the data will be gzipped. I like the csv.gz format because AWS Glue understands this format natively and the gzip format has good support in Java. This compressed text will be stored on a S3 bucket. For those who like pictures, i have a diagram below.

All of the code displayed here can be found at https://github.com/darylmathison/aws-stock-price-download. I will be breaking it all down into the Terraform code and the Java code.

Terraform Code

While this is a small project, I still broke up the files into different sections to keep it organized for me.

main.tf

This lays out the basics of the providers and any extra modules to be brought in.

terraform {
  required_version = ">= 1.0.0" # Ensure that the Terraform version is 1.0.0 or higher

  required_providers {
    aws = {
      source = "hashicorp/aws" 
      version = "~> 5.98.0"       
    }
    random = {
      source  = "hashicorp/random"
      version = "3.7.2"
    }
  }
}

provider "aws" {
  region = var.region 
}

s3.tf

This file creates two buckets, one for code to be uploaded into lambdas named “code” and a bucket for data named “data.” The code bucket is to keep packaged code to be incorporated into a lambda. The data bucket is to keep downloaded data. I will be keeping the list of stocks I wish to keep track of.

resource "random_id" "bucket_suffix" {
  byte_length = 10
}


resource "aws_s3_bucket" "code_bucket" {
  bucket = "${var.code_bucket}-${random_id.bucket_suffix.hex}"
}

resource "aws_s3_bucket" "data_bucket" {
  bucket = "${var.data_bucket_name}-${random_id.bucket_suffix.hex}"
}

data.tf

This file keeps all of the static data objects and where they get uploaded. I only have the symbols.txt file right now.

resource "aws_s3_object" "symbols_upload" {
  depends_on = [aws_s3_bucket.data_bucket]
  bucket = aws_s3_bucket.data_bucket.bucket
  key    = var.symbols_filename
  source = var.symbols_location
  etag = filemd5(var.symbols_location)
}

lambda.tf

This defines the lambda that will get filled with Java code. The majority of the file is defining the permissions it will need to perform its task. I have learned the hard way to define the preferred timezone in the environment variables. One may have noticed I used environment variables for the Alpaca api key. Last I checked AWS does keep the environment variable a secret while at rest. While that works, the better solution is to use the AWS secrets manager so keep that in mind. The other environment variables (HISTORY_DAYS, DATA_BUCKET) allow me to change lambda behavior without changing code.

data "aws_iam_policy_document" "extract_market_data_document" {
  statement {
    effect = "Allow"
    principals {
      type        = "Service"
      identifiers = ["lambda.amazonaws.com"]
    }

    actions = ["sts:AssumeRole"]
  }
}

data "aws_iam_policy_document" "aws_iam_extract_market_data_aws_lambda_iam_policy_document" {
  statement {
    effect = "Allow"
    resources = ["*"]
    actions = [
      "logs:CreateLogGroup",
      "logs:CreateLogStream",
      "logs:PutLogEvents"
    ]
  }

  statement {
    effect = "Allow"
    resources = [
      aws_s3_bucket.data_bucket.arn,
      "${aws_s3_bucket.data_bucket.arn}/*"
    ]
    actions = [
      "s3:GetObject",
      "s3:GetObjectVersion",
      "s3:GetObjectAcl",
      "s3:GetObjectTagging",
      "s3:PutObject",
      "s3:PutObjectAcl",
      "s3:DeleteObject",
      "s3:DeleteObjectVersion",
      "s3:PutObjectTagging"
    ]
  }
}

resource "aws_iam_role" "iam_for_lambda" {
  name = "iam_for_lambda"
  assume_role_policy = data.aws_iam_policy_document.extract_market_data_document.json
}

resource "aws_iam_role_policy" "aws_lambda_iam_policy" {
  policy = data.aws_iam_policy_document.aws_iam_extract_market_data_aws_lambda_iam_policy_document.json
  role = aws_iam_role.iam_for_lambda.id
}

resource "aws_s3_object" "s3_object_upload" {
  depends_on = [aws_s3_bucket.code_bucket]
  bucket = aws_s3_bucket.code_bucket.bucket
  key    = var.lambda_filename
  source = var.file_location
  etag = filemd5(var.file_location)
}

resource "aws_lambda_function" "extract_market_data_aws_lambda" {
  depends_on = [aws_s3_object.s3_object_upload]
  function_name = var.lambda_function
  role          = aws_iam_role.iam_for_lambda.arn
  handler       = var.lambda_handler
  source_code_hash = filebase64sha256(var.file_location)
  s3_bucket     = aws_s3_bucket.code_bucket.bucket
  s3_key        = var.lambda_filename
  runtime       = var.runtime
  timeout       = var.timeout
  memory_size = 512
  environment {
    variables = {
      TZ = var.timezone
      DATA_BUCKET = aws_s3_bucket.data_bucket.bucket
      SYMBOLS = var.symbols_filename
      HISTORY_DAYS = var.history_days
      ALPACA_API_KEY = var.alpacaApiKey
      ALPACA_SECRET_KEY = var.alpacaSecretKey
    }
  }
}

resource "aws_cloudwatch_event_rule" "event_rule" {
  name = "event_rule"
  schedule_expression = var.cron_friday_after_market
}

resource "aws_cloudwatch_event_target" "event_target" {
  arn  = aws_lambda_function.extract_market_data_aws_lambda.arn
  rule = aws_cloudwatch_event_rule.event_rule.name
  target_id = aws_lambda_function.extract_market_data_aws_lambda.function_name
}

resource "aws_lambda_permission" "lambda_permission" {
  statement_id = "AllowExecutionFromCloudWatch"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.extract_market_data_aws_lambda.function_name
  principal     = "events.amazonaws.com"
  source_arn = aws_cloudwatch_event_rule.event_rule.arn
}

Java Code

This next section is the Java code that occupied the lambda defined above.

StorageStockBar

The OHLC date from Alpaca does not contain the symbol. This class is to transform the extracted data from the data source to something I can store.

package com.darylmathison.market.model;

import java.time.ZonedDateTime;

@lombok.Data
@lombok.Builder
public class StorageStockBar {
  private String symbol;
  private ZonedDateTime timestamp;
  private double open;
  private double high;
  private double low;
  private double close;
  private double volume;
}

PriceDataDTOImpl

In a Spring application, the DTO layer is to pull data from the data source (Extract) and turn it into what the application needs (Transformation). Most of the time, a DTO class will call a database but can call other services. This class does minimal transformation by turning what comes from Alpaca into a StorageStockBar.

package com.darylmathison.market.dto.impl;


import com.darylmathison.market.dto.PriceDataDTO;
import com.darylmathison.market.model.StorageStockBar;
import java.time.LocalDate;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import net.jacobpeterson.alpaca.AlpacaAPI;
import net.jacobpeterson.alpaca.model.endpoint.marketdata.common.historical.bar.enums.BarTimePeriod;
import net.jacobpeterson.alpaca.model.endpoint.marketdata.stock.historical.bar.MultiStockBarsResponse;
import net.jacobpeterson.alpaca.model.endpoint.marketdata.stock.historical.bar.enums.BarAdjustment;
import net.jacobpeterson.alpaca.model.endpoint.marketdata.stock.historical.bar.enums.BarFeed;
import org.springframework.stereotype.Service;

@Service
public class PriceDataDTOImpl implements PriceDataDTO {

  private final AlpacaAPI alpacaAPI;

  public PriceDataDTOImpl(AlpacaAPI alpacaAPI) {
    this.alpacaAPI = alpacaAPI;
  }

  @Override
  public List<StorageStockBar> getPriceData(List<String> symbols, LocalDate start, LocalDate end)
      throws Exception {
    List<StorageStockBar> allData = new ArrayList<>();

    ZonedDateTime requestStart = toZoneDateTime(start);
    ZonedDateTime requestEnd = toZoneDateTime(end);
    boolean callAgain = true;
    MultiStockBarsResponse barsResponse = alpacaAPI.stockMarketData()
        .getBars(symbols, requestStart, requestEnd, null, null, 15, BarTimePeriod.MINUTE,
            BarAdjustment.RAW, BarFeed.IEX);
    while (callAgain) {
      barsResponse.getBars().forEach((symbol, bars) -> bars.forEach(bar -> allData.add(
          StorageStockBar.builder().symbol(symbol).timestamp(bar.getTimestamp()).open(bar.getOpen())
              .close(bar.getClose()).high(bar.getHigh()).low(bar.getLow())
              .volume(bar.getTradeCount()).build())));
      barsResponse = alpacaAPI.stockMarketData()
          .getBars(symbols, requestStart, requestEnd, null, barsResponse.getNextPageToken(), 15,
              BarTimePeriod.MINUTE, BarAdjustment.RAW, BarFeed.IEX);
      callAgain = barsResponse.getNextPageToken() != null;
    }

    return allData;
  }

  private ZonedDateTime toZoneDateTime(LocalDate localDate) {
    return localDate.atStartOfDay(ZoneId.of("America/New_York"));
  }
}

S3ServiceImpl

It can be a pain to deal with the S3 interface so I created this service to make it less painful. The class has the abilty to getObject and putObject. I added fetchList as a convenience method. The method assumes that the format of the object being read is a list of Strings where each entry is on a separate line.

package com.darylmathison.market.service.impl;

import com.darylmathison.market.service.S3Service;
import org.springframework.beans.factory.ObjectFactory;
import org.springframework.stereotype.Service;
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.InputStreamReader;
import java.util.List;

/**
 * Implementation of S3Service for interacting with Amazon S3.
 */
@Service
public class S3ServiceImpl implements S3Service {

    private final ObjectFactory<S3Client> s3ClientFactory;

    public S3ServiceImpl(ObjectFactory<S3Client> s3ClientFactory) {
        this.s3ClientFactory = s3ClientFactory;
    }

    /**
     * Fetches a list of strings from an S3 object.
     *
     * @param bucket The S3 bucket name
     * @param key The S3 object key
     * @return List of strings read from the S3 object
     */
    @Override
    public List<String> fetchList(String bucket, String key) {

        try (BufferedReader reader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(getObject(bucket, key))))) {

            return reader.lines().toList();
        } catch (Exception e) {
            throw new RuntimeException("Failed to fetch list from S3: " + bucket + "/" + key, e);
        }
    }

    /**
     * Gets an object from S3 as a byte array.
     *
     * @param bucket The S3 bucket name
     * @param key The S3 object key
     * @return The object content as byte array
     */
    @Override
    public byte[] getObject(String bucket, String key) {
        try (S3Client s3Client = s3ClientFactory.getObject()) {
            GetObjectRequest getObjectRequest = GetObjectRequest.builder()
                    .bucket(bucket)
                    .key(key)
                    .build();

            ResponseBytes<GetObjectResponse> objectBytes = s3Client.getObjectAsBytes(getObjectRequest);
            return objectBytes.asByteArray();
        } catch (Exception e) {
            throw new RuntimeException("Failed to get object from S3: " + bucket + "/" + key, e);
        }
    }

    /**
     * Puts an object into S3.
     *
     * @param bucket The S3 bucket name
     * @param key The S3 object key
     * @param data The data to store in S3
     */
    @Override
    public void putObject(String bucket, String key, byte[] data) {
        try (S3Client s3Client = s3ClientFactory.getObject()) {
            PutObjectRequest putObjectRequest = PutObjectRequest.builder()
                    .bucket(bucket)
                    .key(key)
                    .build();

            s3Client.putObject(putObjectRequest,
                    software.amazon.awssdk.core.sync.RequestBody.fromBytes(data));
        } catch (Exception e) {
            throw new RuntimeException("Failed to put object to S3: " + bucket + "/" + key, e);
        }
    }
}

StockPriceServiceImpl

This is where the main work happens. The service gets the data, turns it into csv format and compresses it all in the gzip format. The cvs.gz format is understood by AWS Glue so this format gives me options on how to process the data. After the data is extracted and transformed, it is loaded into a the data S3 bucket completing the ETL process.

package com.darylmathison.market.service.impl;

import com.darylmathison.market.service.StockPriceService;
import com.darylmathison.market.model.StorageStockBar;
import com.darylmathison.market.dto.PriceDataDTO;
import com.darylmathison.market.service.S3Service;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.logging.Logger;
import java.util.logging.Level;
import java.util.zip.GZIPOutputStream;

/**
 * Implementation of StockPriceService for downloading and storing stock price data.
 */
@Service
public class StockPriceServiceImpl implements StockPriceService {

    private static final Logger logger = Logger.getLogger(StockPriceServiceImpl.class.getName());

    @Value("${data.bucket.name}")
    private String dataBucketName;

    @Value("${data.symbols.file}")
    private String symbolsFileKey;

    @Value("${history.days:7}")
    private int historyDays;

    private final PriceDataDTO priceDataDTO;
    private final S3Service s3Service;

    public StockPriceServiceImpl(PriceDataDTO priceDataDTO, S3Service s3Service) {
        this.priceDataDTO = priceDataDTO;
        this.s3Service = s3Service;
    }

    /**
     * Downloads price data for configured symbols and stores it in S3.
     *
     * @return Number of price records processed
     */
    @Override
    public int getPriceData() {
        try {
            logger.info("Starting price data download process");

            // Get symbols from S3
            List<String> symbols = s3Service.fetchList(dataBucketName, symbolsFileKey);
            logger.info("Fetched " + symbols.size() + " symbols from S3");

            // Calculate date range
            LocalDate endDate = LocalDate.now();
            LocalDate startDate = endDate.minusDays(historyDays);
            logger.info("Downloading price data from " + startDate + " to " + endDate);

            // Get price data
            List<StorageStockBar> priceData = priceDataDTO.getPriceData(symbols, startDate, endDate);
            logger.info("Retrieved " + priceData.size() + " price records");

            // Convert to compressed CSV
            byte[] compressedData = generateCompressedCSV(priceData);
            logger.info("Generated compressed CSV data: " + compressedData.length + " bytes");

            // Create S3 key with today's date
            String key = generateS3Key(endDate);

            // Upload to S3
            s3Service.putObject(dataBucketName, key, compressedData);
            logger.info("Successfully uploaded price data to S3: " + key);

            return priceData.size();
        } catch (Exception e) {
            logger.log(Level.SEVERE, "Failed to process price data: " + e.getMessage(), e);
            throw new RuntimeException("Failed to process price data: " + e.getMessage(), e);
        }
    }

    /**
     * Generates an S3 key in the format "stock_prices/prices_{year}-{month}-{day}.csv.gz"
     */
    private String generateS3Key(LocalDate date) {
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
        return String.format("stock_prices/prices_%s.csv.gz", date.format(formatter));
    }

    /**
     * Converts a list of StorageStockBar objects to a compressed CSV byte array.
     */
    private byte[] generateCompressedCSV(List<StorageStockBar> priceData) throws IOException {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        GZIPOutputStream gzipOut = new GZIPOutputStream(out);
        OutputStreamWriter writer = new OutputStreamWriter(gzipOut);

        try (CSVPrinter printer = new CSVPrinter(writer,
            CSVFormat.DEFAULT.builder().setHeader("symbol", "timestamp", "open", "high", "low", "close", "volume").get())) {
            for (StorageStockBar bar : priceData) {
                printer.printRecord(
                    bar.getSymbol(),
                    bar.getTimestamp(),
                    bar.getOpen(),
                    bar.getHigh(),
                    bar.getLow(),
                    bar.getClose(),
                    bar.getVolume()
                );
            }
        }
        gzipOut.finish();
        return out.toByteArray();
    }
}

StockPriceLambdaHandler

This class starts up SpringApplication and reads in any configuration files. After the SpringApplication is initialized, the StockPriceService class is instantiated StockPriceService::getStockData() is called.

package com.darylmathison.market.handler;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.darylmathison.market.SpringConfig;
import com.darylmathison.market.service.StockPriceService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ConfigurableApplicationContext;

import java.util.logging.Logger;
import java.util.logging.Level;


@SpringBootApplication
@EnableConfigurationProperties
public class StockPriceLambdaHandler implements RequestHandler<Object, String> {

  private static final Logger logger = Logger.getLogger(StockPriceLambdaHandler.class.getName());

  /**
   * Handle the Lambda request.
   *
   * @param input   The input for the Lambda function (can be passed as JSON).
   * @param context Lambda execution context.
   * @return A JSON string representing the result of the price data download.
   */
  @Override
  public String handleRequest(Object input, Context context) {
    try {
      SpringApplication app = new SpringApplication(SpringConfig.class);
      app.setWebApplicationType(WebApplicationType.NONE);

      if (context != null) {
        context.getLogger().log("Starting application context");
      }

      ConfigurableApplicationContext applicationContext = app.run();
      StockPriceService stockPriceService = applicationContext.getBean(StockPriceService.class);

      int recordCount = stockPriceService.getPriceData();
      logger.info("Downloaded " + recordCount + " stock price records");
      return String.format("{\"success\": true, \"recordsProcessed\": %d}", recordCount);
    } catch (Exception e) {
      // Lambda error handling
      logger.log(Level.SEVERE, "Failed to process request", e);
      return String.format("{\"success\": false, \"error\": \"%s\"}", e.getMessage());
    }
  }
}

Conclusion

In this post I have demonstrated a very simple stock price downloader. With a little work, it can download almost any dataset. Do you have a data set you would like to save? Leave your answer in the comments below.

Anyone that wants to download the code and see for yourself and find it at https://github.com/darylmathison/aws-stock-price-download.

One response

  1. […] you go and read this article, you need to read the last post where I showed how to make this simple but powerful tool. Its OK, I can […]