The Problem
I have been involved with the stock markets ever since I was a child. While my dad spared me from the down and dirty details, he was open about investing in the stock market. That early exposure gave me a natural faciniation of the markets. There are many places to look at graphs but I would like to make my own graphs and that requires data. The easiest way to have my own database is to collect data from a reliable source.

Designing a Cloud Native ETL Process
When designing a ETL process, one must account for
- What data is being gathered?
- Why is the data being used?
- Where will the data be stored?
- How to gather the data?
What data is being gathered?
This will be stock market data in 15 minute intervals from the prior week.
Why is the data being used?
This data will be used for historical stock analysis. While I know this does not guarantee future performance, I have found it is a good predictor.
Where will the data be stored?
I can put the data directly into a database considering I run a PostgreSQL instance on my laptop. However, I am not always doing stock analysis so keeping it outside of a database is good too. Considering historical data is not known for changing much, as long the data is available and in a known format, that is good enough.

Implementation
Tech Stack
I will be using Java/Spring for the codebase. Terraform will be the IAC component as it is used normally by enterprise operations.
Cloud Native
I chose using a cloud native solution because it allows me to do other things on my laptop besides wonder if my download process is running and if it is safe to restart my laptop. I will be using AWS because I am already a customer of that cloud provider and I am familiar with the services I will be using.
Final Data Form
Market data for me tends to be on the cool side until I need it. For that reason, I will be taking the data from my brokerage and converting it to a csv.gz file. This file will be stored on a AWS S3 Bucket until I need it.
Process flow
The lambda will be triggered every Friday at 5pm US/Eastern. It will upload a list of symbols contained in the file named “symbols.txt” contained in one of my data bucket. That list of symbols will form a request of the last 5 days (M-F) of OHLC (Open High Low Close) prices at 15 minute intervals. After the request is fulfilled, the data will be converted to csv files. Each row will be form of symbol, timestamp, open, high, low, close, volume. After the csv transformation, the data will be gzipped. I like the csv.gz format because AWS Glue understands this format natively and the gzip format has good support in Java. This compressed text will be stored on a S3 bucket. For those who like pictures, i have a diagram below.

All of the code displayed here can be found at https://github.com/darylmathison/aws-stock-price-download. I will be breaking it all down into the Terraform code and the Java code.
Terraform Code
While this is a small project, I still broke up the files into different sections to keep it organized for me.
main.tf
This lays out the basics of the providers and any extra modules to be brought in.
terraform { required_version = ">= 1.0.0" # Ensure that the Terraform version is 1.0.0 or higher required_providers { aws = { source = "hashicorp/aws" version = "~> 5.98.0" } random = { source = "hashicorp/random" version = "3.7.2" } } } provider "aws" { region = var.region }
s3.tf
This file creates two buckets, one for code to be uploaded into lambdas named “code” and a bucket for data named “data.” The code bucket is to keep packaged code to be incorporated into a lambda. The data bucket is to keep downloaded data. I will be keeping the list of stocks I wish to keep track of.
resource "random_id" "bucket_suffix" { byte_length = 10 } resource "aws_s3_bucket" "code_bucket" { bucket = "${var.code_bucket}-${random_id.bucket_suffix.hex}" } resource "aws_s3_bucket" "data_bucket" { bucket = "${var.data_bucket_name}-${random_id.bucket_suffix.hex}" }
data.tf
This file keeps all of the static data objects and where they get uploaded. I only have the symbols.txt file right now.
resource "aws_s3_object" "symbols_upload" { depends_on = [aws_s3_bucket.data_bucket] bucket = aws_s3_bucket.data_bucket.bucket key = var.symbols_filename source = var.symbols_location etag = filemd5(var.symbols_location) }
lambda.tf
This defines the lambda that will get filled with Java code. The majority of the file is defining the permissions it will need to perform its task. I have learned the hard way to define the preferred timezone in the environment variables. One may have noticed I used environment variables for the Alpaca api key. Last I checked AWS does keep the environment variable a secret while at rest. While that works, the better solution is to use the AWS secrets manager so keep that in mind. The other environment variables (HISTORY_DAYS, DATA_BUCKET) allow me to change lambda behavior without changing code.
data "aws_iam_policy_document" "extract_market_data_document" { statement { effect = "Allow" principals { type = "Service" identifiers = ["lambda.amazonaws.com"] } actions = ["sts:AssumeRole"] } } data "aws_iam_policy_document" "aws_iam_extract_market_data_aws_lambda_iam_policy_document" { statement { effect = "Allow" resources = ["*"] actions = [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ] } statement { effect = "Allow" resources = [ aws_s3_bucket.data_bucket.arn, "${aws_s3_bucket.data_bucket.arn}/*" ] actions = [ "s3:GetObject", "s3:GetObjectVersion", "s3:GetObjectAcl", "s3:GetObjectTagging", "s3:PutObject", "s3:PutObjectAcl", "s3:DeleteObject", "s3:DeleteObjectVersion", "s3:PutObjectTagging" ] } } resource "aws_iam_role" "iam_for_lambda" { name = "iam_for_lambda" assume_role_policy = data.aws_iam_policy_document.extract_market_data_document.json } resource "aws_iam_role_policy" "aws_lambda_iam_policy" { policy = data.aws_iam_policy_document.aws_iam_extract_market_data_aws_lambda_iam_policy_document.json role = aws_iam_role.iam_for_lambda.id } resource "aws_s3_object" "s3_object_upload" { depends_on = [aws_s3_bucket.code_bucket] bucket = aws_s3_bucket.code_bucket.bucket key = var.lambda_filename source = var.file_location etag = filemd5(var.file_location) } resource "aws_lambda_function" "extract_market_data_aws_lambda" { depends_on = [aws_s3_object.s3_object_upload] function_name = var.lambda_function role = aws_iam_role.iam_for_lambda.arn handler = var.lambda_handler source_code_hash = filebase64sha256(var.file_location) s3_bucket = aws_s3_bucket.code_bucket.bucket s3_key = var.lambda_filename runtime = var.runtime timeout = var.timeout memory_size = 512 environment { variables = { TZ = var.timezone DATA_BUCKET = aws_s3_bucket.data_bucket.bucket SYMBOLS = var.symbols_filename HISTORY_DAYS = var.history_days ALPACA_API_KEY = var.alpacaApiKey ALPACA_SECRET_KEY = var.alpacaSecretKey } } } resource "aws_cloudwatch_event_rule" "event_rule" { name = "event_rule" schedule_expression = var.cron_friday_after_market } resource "aws_cloudwatch_event_target" "event_target" { arn = aws_lambda_function.extract_market_data_aws_lambda.arn rule = aws_cloudwatch_event_rule.event_rule.name target_id = aws_lambda_function.extract_market_data_aws_lambda.function_name } resource "aws_lambda_permission" "lambda_permission" { statement_id = "AllowExecutionFromCloudWatch" action = "lambda:InvokeFunction" function_name = aws_lambda_function.extract_market_data_aws_lambda.function_name principal = "events.amazonaws.com" source_arn = aws_cloudwatch_event_rule.event_rule.arn }
Java Code
This next section is the Java code that occupied the lambda defined above.
StorageStockBar
The OHLC date from Alpaca does not contain the symbol. This class is to transform the extracted data from the data source to something I can store.
package com.darylmathison.market.model; import java.time.ZonedDateTime; @lombok.Data @lombok.Builder public class StorageStockBar { private String symbol; private ZonedDateTime timestamp; private double open; private double high; private double low; private double close; private double volume; }
PriceDataDTOImpl
In a Spring application, the DTO layer is to pull data from the data source (Extract) and turn it into what the application needs (Transformation). Most of the time, a DTO class will call a database but can call other services. This class does minimal transformation by turning what comes from Alpaca into a StorageStockBar.
package com.darylmathison.market.dto.impl; import com.darylmathison.market.dto.PriceDataDTO; import com.darylmathison.market.model.StorageStockBar; import java.time.LocalDate; import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.List; import net.jacobpeterson.alpaca.AlpacaAPI; import net.jacobpeterson.alpaca.model.endpoint.marketdata.common.historical.bar.enums.BarTimePeriod; import net.jacobpeterson.alpaca.model.endpoint.marketdata.stock.historical.bar.MultiStockBarsResponse; import net.jacobpeterson.alpaca.model.endpoint.marketdata.stock.historical.bar.enums.BarAdjustment; import net.jacobpeterson.alpaca.model.endpoint.marketdata.stock.historical.bar.enums.BarFeed; import org.springframework.stereotype.Service; @Service public class PriceDataDTOImpl implements PriceDataDTO { private final AlpacaAPI alpacaAPI; public PriceDataDTOImpl(AlpacaAPI alpacaAPI) { this.alpacaAPI = alpacaAPI; } @Override public List<StorageStockBar> getPriceData(List<String> symbols, LocalDate start, LocalDate end) throws Exception { List<StorageStockBar> allData = new ArrayList<>(); ZonedDateTime requestStart = toZoneDateTime(start); ZonedDateTime requestEnd = toZoneDateTime(end); boolean callAgain = true; MultiStockBarsResponse barsResponse = alpacaAPI.stockMarketData() .getBars(symbols, requestStart, requestEnd, null, null, 15, BarTimePeriod.MINUTE, BarAdjustment.RAW, BarFeed.IEX); while (callAgain) { barsResponse.getBars().forEach((symbol, bars) -> bars.forEach(bar -> allData.add( StorageStockBar.builder().symbol(symbol).timestamp(bar.getTimestamp()).open(bar.getOpen()) .close(bar.getClose()).high(bar.getHigh()).low(bar.getLow()) .volume(bar.getTradeCount()).build()))); barsResponse = alpacaAPI.stockMarketData() .getBars(symbols, requestStart, requestEnd, null, barsResponse.getNextPageToken(), 15, BarTimePeriod.MINUTE, BarAdjustment.RAW, BarFeed.IEX); callAgain = barsResponse.getNextPageToken() != null; } return allData; } private ZonedDateTime toZoneDateTime(LocalDate localDate) { return localDate.atStartOfDay(ZoneId.of("America/New_York")); } }
S3ServiceImpl
It can be a pain to deal with the S3 interface so I created this service to make it less painful. The class has the abilty to getObject and putObject. I added fetchList as a convenience method. The method assumes that the format of the object being read is a list of Strings where each entry is on a separate line.
package com.darylmathison.market.service.impl; import com.darylmathison.market.service.S3Service; import org.springframework.beans.factory.ObjectFactory; import org.springframework.stereotype.Service; import software.amazon.awssdk.core.ResponseBytes; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.InputStreamReader; import java.util.List; /** * Implementation of S3Service for interacting with Amazon S3. */ @Service public class S3ServiceImpl implements S3Service { private final ObjectFactory<S3Client> s3ClientFactory; public S3ServiceImpl(ObjectFactory<S3Client> s3ClientFactory) { this.s3ClientFactory = s3ClientFactory; } /** * Fetches a list of strings from an S3 object. * * @param bucket The S3 bucket name * @param key The S3 object key * @return List of strings read from the S3 object */ @Override public List<String> fetchList(String bucket, String key) { try (BufferedReader reader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(getObject(bucket, key))))) { return reader.lines().toList(); } catch (Exception e) { throw new RuntimeException("Failed to fetch list from S3: " + bucket + "/" + key, e); } } /** * Gets an object from S3 as a byte array. * * @param bucket The S3 bucket name * @param key The S3 object key * @return The object content as byte array */ @Override public byte[] getObject(String bucket, String key) { try (S3Client s3Client = s3ClientFactory.getObject()) { GetObjectRequest getObjectRequest = GetObjectRequest.builder() .bucket(bucket) .key(key) .build(); ResponseBytes<GetObjectResponse> objectBytes = s3Client.getObjectAsBytes(getObjectRequest); return objectBytes.asByteArray(); } catch (Exception e) { throw new RuntimeException("Failed to get object from S3: " + bucket + "/" + key, e); } } /** * Puts an object into S3. * * @param bucket The S3 bucket name * @param key The S3 object key * @param data The data to store in S3 */ @Override public void putObject(String bucket, String key, byte[] data) { try (S3Client s3Client = s3ClientFactory.getObject()) { PutObjectRequest putObjectRequest = PutObjectRequest.builder() .bucket(bucket) .key(key) .build(); s3Client.putObject(putObjectRequest, software.amazon.awssdk.core.sync.RequestBody.fromBytes(data)); } catch (Exception e) { throw new RuntimeException("Failed to put object to S3: " + bucket + "/" + key, e); } } }
StockPriceServiceImpl
This is where the main work happens. The service gets the data, turns it into csv format and compresses it all in the gzip format. The cvs.gz format is understood by AWS Glue so this format gives me options on how to process the data. After the data is extracted and transformed, it is loaded into a the data S3 bucket completing the ETL process.
package com.darylmathison.market.service.impl; import com.darylmathison.market.service.StockPriceService; import com.darylmathison.market.model.StorageStockBar; import com.darylmathison.market.dto.PriceDataDTO; import com.darylmathison.market.service.S3Service; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVPrinter; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStreamWriter; import java.time.LocalDate; import java.time.format.DateTimeFormatter; import java.util.List; import java.util.logging.Logger; import java.util.logging.Level; import java.util.zip.GZIPOutputStream; /** * Implementation of StockPriceService for downloading and storing stock price data. */ @Service public class StockPriceServiceImpl implements StockPriceService { private static final Logger logger = Logger.getLogger(StockPriceServiceImpl.class.getName()); @Value("${data.bucket.name}") private String dataBucketName; @Value("${data.symbols.file}") private String symbolsFileKey; @Value("${history.days:7}") private int historyDays; private final PriceDataDTO priceDataDTO; private final S3Service s3Service; public StockPriceServiceImpl(PriceDataDTO priceDataDTO, S3Service s3Service) { this.priceDataDTO = priceDataDTO; this.s3Service = s3Service; } /** * Downloads price data for configured symbols and stores it in S3. * * @return Number of price records processed */ @Override public int getPriceData() { try { logger.info("Starting price data download process"); // Get symbols from S3 List<String> symbols = s3Service.fetchList(dataBucketName, symbolsFileKey); logger.info("Fetched " + symbols.size() + " symbols from S3"); // Calculate date range LocalDate endDate = LocalDate.now(); LocalDate startDate = endDate.minusDays(historyDays); logger.info("Downloading price data from " + startDate + " to " + endDate); // Get price data List<StorageStockBar> priceData = priceDataDTO.getPriceData(symbols, startDate, endDate); logger.info("Retrieved " + priceData.size() + " price records"); // Convert to compressed CSV byte[] compressedData = generateCompressedCSV(priceData); logger.info("Generated compressed CSV data: " + compressedData.length + " bytes"); // Create S3 key with today's date String key = generateS3Key(endDate); // Upload to S3 s3Service.putObject(dataBucketName, key, compressedData); logger.info("Successfully uploaded price data to S3: " + key); return priceData.size(); } catch (Exception e) { logger.log(Level.SEVERE, "Failed to process price data: " + e.getMessage(), e); throw new RuntimeException("Failed to process price data: " + e.getMessage(), e); } } /** * Generates an S3 key in the format "stock_prices/prices_{year}-{month}-{day}.csv.gz" */ private String generateS3Key(LocalDate date) { DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); return String.format("stock_prices/prices_%s.csv.gz", date.format(formatter)); } /** * Converts a list of StorageStockBar objects to a compressed CSV byte array. */ private byte[] generateCompressedCSV(List<StorageStockBar> priceData) throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); GZIPOutputStream gzipOut = new GZIPOutputStream(out); OutputStreamWriter writer = new OutputStreamWriter(gzipOut); try (CSVPrinter printer = new CSVPrinter(writer, CSVFormat.DEFAULT.builder().setHeader("symbol", "timestamp", "open", "high", "low", "close", "volume").get())) { for (StorageStockBar bar : priceData) { printer.printRecord( bar.getSymbol(), bar.getTimestamp(), bar.getOpen(), bar.getHigh(), bar.getLow(), bar.getClose(), bar.getVolume() ); } } gzipOut.finish(); return out.toByteArray(); } }
StockPriceLambdaHandler
This class starts up SpringApplication and reads in any configuration files. After the SpringApplication is initialized, the StockPriceService class is instantiated StockPriceService::getStockData() is called.
package com.darylmathison.market.handler; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.darylmathison.market.SpringConfig; import com.darylmathison.market.service.StockPriceService; import org.springframework.boot.SpringApplication; import org.springframework.boot.WebApplicationType; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; import java.util.logging.Logger; import java.util.logging.Level; @SpringBootApplication @EnableConfigurationProperties public class StockPriceLambdaHandler implements RequestHandler<Object, String> { private static final Logger logger = Logger.getLogger(StockPriceLambdaHandler.class.getName()); /** * Handle the Lambda request. * * @param input The input for the Lambda function (can be passed as JSON). * @param context Lambda execution context. * @return A JSON string representing the result of the price data download. */ @Override public String handleRequest(Object input, Context context) { try { SpringApplication app = new SpringApplication(SpringConfig.class); app.setWebApplicationType(WebApplicationType.NONE); if (context != null) { context.getLogger().log("Starting application context"); } ConfigurableApplicationContext applicationContext = app.run(); StockPriceService stockPriceService = applicationContext.getBean(StockPriceService.class); int recordCount = stockPriceService.getPriceData(); logger.info("Downloaded " + recordCount + " stock price records"); return String.format("{\"success\": true, \"recordsProcessed\": %d}", recordCount); } catch (Exception e) { // Lambda error handling logger.log(Level.SEVERE, "Failed to process request", e); return String.format("{\"success\": false, \"error\": \"%s\"}", e.getMessage()); } } }
Conclusion
In this post I have demonstrated a very simple stock price downloader. With a little work, it can download almost any dataset. Do you have a data set you would like to save? Leave your answer in the comments below.
Anyone that wants to download the code and see for yourself and find it at https://github.com/darylmathison/aws-stock-price-download.



One response
[…] you go and read this article, you need to read the last post where I showed how to make this simple but powerful tool. Its OK, I can […]