---
title: "Working with Large Data"
author: "Gilles Colling"
date: "`r Sys.Date()`"
output: rmarkdown::html_vignette
vignette: >
  %\VignetteIndexEntry{Working with Large Data}
  %\VignetteEngine{knitr::rmarkdown}
  %\VignetteEncoding{UTF-8}
---

```{r setup, include = FALSE}
knitr::opts_chunk$set(
  collapse = TRUE,
  comment = "#>"
)
```

## Introduction

R holds data.frames entirely in memory. A 10 GB CSV loaded with `read.csv()`
becomes a 10 GB R object, sometimes larger: R's internal representation of
character vectors, factor levels, and ALTREP metadata can inflate the footprint
to 2 to 3x the raw data size. On a machine with 16 GB of RAM, that 10 GB file
will either fail to load or push the system into swap, grinding everything to a
halt. "Larger-than-RAM" is not a theoretical concern. It is the normal state of
affairs for anyone working with multi-year survey data, remote sensing extracts,
or event logs from sensor networks.

vectra sidesteps this by never loading the full dataset. When we call `tbl()` on
a `.vtr` file or `tbl_csv()` on a CSV, nothing is read yet. The call to
`collect()`, `write_vtr()`, or any other terminal verb pulls data through the
plan tree one row group at a time. Each row group is a self-contained slice of
the dataset: a few thousand to a few hundred thousand rows with all their
columns. The engine reads one group, applies every operation in the pipeline,
emits the result, then discards the group and moves to the next. The key
distinction from R's default approach: data moves through the pipeline in
fixed-size pieces, and each piece is freed before the next one arrives.

This batch-at-a-time design means that most vectra operations use constant
memory per batch, regardless of how many total rows exist. A `filter` node
reads a batch, applies a predicate, passes matching rows downstream, and
forgets about them. A `mutate` node reads a batch, computes new columns,
passes the result on. A `select` node simply drops columns from each batch.
None of these ever accumulate the full dataset in memory.

A few operations do need to see all the data before they can produce output.
Sorting requires a global ordering. Hash-based grouping needs to track every
distinct group key. Joins need a hash table of the build side. These
"materializing" operations have bounded memory budgets or known scaling
behaviour that we will discuss in detail.

The central implication is this: if we can express our workload as a chain of
streaming operations terminated by a streaming sink, the full dataset never
needs to fit in RAM. A 200 GB CSV can flow through `tbl_csv() |> filter() |>
mutate() |> write_vtr()` on a laptop with 8 GB of memory. The pipeline
processes each batch in isolation and writes the result directly to disk.

`collect()` is the one terminal verb that materializes everything into an R
data.frame. It breaks the streaming model: the entire result set crosses from C
into R and lives in R's heap until we remove it. For large data, we avoid it for
the final result and instead use `write_vtr()`, `write_csv()`, or
`write_sqlite()` as our streaming sinks. These functions pull batches through the
pipeline and write each one directly to disk, so the C engine's memory is
recycled on every iteration. The R process never holds more than one batch. We
still use `collect()` freely for small intermediate checks, filtered subsets,
and aggregation results that we know will fit in memory. The question to ask
before calling `collect()` is always: how many rows will this return? If the
answer is "roughly the same as the input," we want a streaming sink instead.

This vignette walks through the patterns and tools vectra provides for
working with datasets that exceed available RAM. We will build our examples
around a synthetic multi-year ecological survey dataset: species observations
recorded across monitoring sites over several years.

```{r synthetic-data}
library(vectra)

set.seed(42)

n <- 50000
species_pool <- c(
  "Quercus robur", "Fagus sylvatica", "Pinus sylvestris",
  "Betula pendula", "Acer pseudoplatanus", "Fraxinus excelsior",
  "Picea abies", "Tilia cordata", "Carpinus betulus",
  "Alnus glutinosa", "Sorbus aucuparia", "Ulmus glabra"
)
sites <- paste0("SITE_", sprintf("%03d", 1:50))
years <- 2015:2024

obs <- data.frame(
  obs_id    = seq_len(n),
  site      = sample(sites, n, replace = TRUE),
  species   = sample(species_pool, n, replace = TRUE),
  year      = sample(years, n, replace = TRUE),
  abundance = rpois(n, lambda = 15),
  cover_pct = round(runif(n, 0.1, 95.0), 1),
  quality   = sample(c("good", "moderate", "poor"), n,
                     replace = TRUE, prob = c(0.6, 0.3, 0.1)),
  stringsAsFactors = FALSE
)
```

We will write this as both CSV and `.vtr` to demonstrate different starting
points. In a real workflow the CSV might be a raw export from a database or
field data logger.

```{r write-sources}
csv_path <- tempfile(fileext = ".csv")
write.csv(obs, csv_path, row.names = FALSE)

vtr_path <- tempfile(fileext = ".vtr")
write_vtr(obs, vtr_path)
```

## Streaming pipelines

The core larger-than-RAM pattern in vectra is source, verbs, sink. We open a
lazy reference to the data, chain operations, and terminate with a write
function. No intermediate data.frame is ever created.

Suppose we want to extract only high-quality observations, compute a
derived column, and store the result as a new `.vtr` file. The entire pipeline
streams:

```{r streaming-basic}
clean_path <- tempfile(fileext = ".vtr")

tbl_csv(csv_path) |>
  filter(quality == "good") |>
  mutate(log_abundance = log(abundance + 1)) |>
  write_vtr(clean_path)
```

At no point did R hold 50,000 rows (or 50 million, if this were a larger
dataset) in memory. `tbl_csv()` created a lazy scan node. `filter()` and
`mutate()` each wrapped it in another lazy node. `write_vtr()` triggered
execution: the CSV scanner read a batch of rows, the filter discarded
non-matching rows, the mutate computed the log column, and the writer
serialized the result to disk. Then the next batch.

At the C level, each node in the pipeline exposes a `next_batch()` function. The
terminal node (the writer) calls `next_batch()` on its upstream node, which in
turn calls `next_batch()` on its own upstream, and so on down to the scan node
that actually reads from disk. Each `VecBatch` struct flows upward through the
chain, gets transformed at each stage, and arrives at the writer ready to be
serialized. Once the writer finishes with a batch, the memory is freed. The
pipeline then pulls the next batch. This pull-based design means memory
consumption stays flat regardless of how many millions of rows pass through. A
pipeline that processes 200 GB of CSV data uses the same peak memory as one that
processes 200 KB.

We can verify the result by reading back a small slice:

```{r verify-streaming}
tbl(clean_path) |>
  select(obs_id, site, species, abundance, log_abundance) |>
  slice_head(n = 5) |>
  collect()
```

The same pattern works for any source-sink combination. CSV to SQLite:

```{r csv-to-sqlite}
db_path <- tempfile(fileext = ".sqlite")

tbl_csv(csv_path) |>
  filter(year >= 2020) |>
  write_sqlite(db_path, "recent_obs")
```

Or `.vtr` to CSV, useful for handing data to colleagues who expect plain text:

```{r vtr-to-csv}
export_csv <- tempfile(fileext = ".csv")

tbl(vtr_path) |>
  filter(species == "Fagus sylvatica") |>
  select(obs_id, site, year, abundance) |>
  write_csv(export_csv)
```

Each pipeline reads only what it needs and writes directly to the target
format. The intermediate verbs never buffer more than one batch. This is
what makes vectra practical for datasets larger than RAM: the memory
footprint of a streaming pipeline is determined by the batch size, not the
dataset size.

When the result of a pipeline *is* small enough for R (aggregation results,
filtered subsets, summary statistics), `collect()` is still the right choice.
Aggregations in particular tend to produce far fewer rows than the input:

```{r collect-agg}
tbl(vtr_path) |>
  group_by(species, year) |>
  summarise(
    n_obs     = n(),
    mean_abun = mean(abundance),
    mean_cov  = mean(cover_pct)
  ) |>
  arrange(species, year) |>
  collect() |>
  head(10)
```

The rule of thumb: if the output might be large, write to disk. If it fits
in a data.frame, collect.

## Batch sizing

A `.vtr` file is organised into row groups. Each row group is a self-contained
unit: it has its own column data, validity bitmaps, dictionary tables, and
zone-map min/max statistics. When vectra scans a file, it reads one row group
at a time. The number of rows per group is the batch size.

Batch size affects two things: memory per batch and zone-map pruning
granularity. Larger batches mean fewer row groups and less per-group
overhead, but each batch consumes more memory during processing. Smaller
batches mean the zone-map statistics cover a narrower range of values, so
filter predicates can skip more groups entirely.

The default batch size is 65,536 rows. This is a reasonable middle ground for
most workloads: large enough that per-batch overhead (reading the row group
header, allocating column arrays, setting up validity bitmaps) is amortized
across many rows, but small enough that each batch stays in the low megabytes
for typical column counts. When choosing a different batch size, the decision
depends on the query pattern. Analytical workloads that scan most of the file
(aggregations, full-table joins) benefit from large batches because they reduce
the number of `next_batch()` calls and the associated per-group bookkeeping.
Point queries that target a few matching rows benefit from small batches,
especially when combined with indexes or sorted columns, because the engine can
skip entire row groups whose zone-map ranges do not overlap the predicate.

For analytical workloads with selective filters, smaller row groups often pay
off. If each row group spans a narrow range of `year` values, a filter like
`year == 2023` can skip most groups without reading them. With one giant row
group, the filter must scan every row.

We control batch size when writing:

```{r batch-size-write}
small_groups <- tempfile(fileext = ".vtr")
large_groups <- tempfile(fileext = ".vtr")

tbl(vtr_path) |>
  arrange(year) |>
  write_vtr(small_groups, batch_size = 5000)

tbl(vtr_path) |>
  arrange(year) |>
  write_vtr(large_groups, batch_size = 50000)
```

We sorted by `year` before writing so that row groups have tight year ranges.
The small-batch file has about 10 row groups; the large-batch file has one.
When we filter on `year`, the small-batch file can prune most groups using
zone maps:

```{r batch-size-compare}
tbl(small_groups) |>
  filter(year == 2023) |>
  explain()
```

CSV and SQLite sources also have a `batch_size` parameter, but it controls
how many rows the scanner reads per pull rather than how the file is
structured on disk. The default of 65,536 works well for most cases. Reducing
it lowers peak memory; increasing it reduces per-batch overhead for simple
pass-through pipelines.

```{r csv-batch-size}
tbl_csv(csv_path, batch_size = 10000) |>
  filter(quality == "good") |>
  slice_head(n = 3) |>
  collect()
```

As a starting point: 50,000 to 100,000 rows per group works well for datasets
with a few dozen columns. If the data is sorted on a frequently-filtered
column, lean towards smaller groups (10,000 to 50,000) for better pruning. If
the workload is dominated by full scans or aggregations, larger groups
(100,000+) reduce overhead.

## Append workflows

Real datasets rarely arrive all at once. Monitoring data comes in daily,
monthly, or seasonally. `append_vtr()` adds new rows to an existing `.vtr`
file as a new row group, without touching existing data. The file header is
updated to reflect the additional group.

Here is a typical pattern: we start with one year of data and append
subsequent years as they arrive.

```{r append-init}
archive <- tempfile(fileext = ".vtr")
first_year <- obs[obs$year == 2015, ]
write_vtr(first_year, archive)

nrow(tbl(archive) |> collect())
```

Now we simulate receiving 2016 data:

```{r append-year}
year_2016 <- obs[obs$year == 2016, ]
append_vtr(year_2016, archive)

nrow(tbl(archive) |> collect())
```

`append_vtr()` also accepts a `vectra_node`, so we can stream data directly
from a CSV into an existing archive without loading it into R:

```{r append-streaming}
csv_2017 <- tempfile(fileext = ".csv")
write.csv(obs[obs$year == 2017, ], csv_2017, row.names = FALSE)

tbl_csv(csv_2017) |> append_vtr(archive)

nrow(tbl(archive) |> collect())
```

Each call to `append_vtr()` creates one new row group. Over time, the file
accumulates many groups of varying sizes. This is fine for append-heavy
workloads. If the number of groups becomes unwieldy (hundreds of small
appends, each adding a few rows), we can compact the file by rewriting it:

```{r compact}
compacted <- tempfile(fileext = ".vtr")
tbl(archive) |> write_vtr(compacted, batch_size = 50000)
```

This reads every row group from the archive, streams through the writer with
a fresh batch size, and produces a clean file. The old file can then be
replaced.

The schema of appended data must exactly match the target file: same column
names, same types, same order. If the schema drifts over time (a new column
added in 2018, say), we need to align it before appending. One approach is
to add the missing column with a default value in a `mutate()` step:

```{r append-schema}
new_data_node <- tbl_csv(csv_2017) |>
  mutate(observer = NA_character_)
```

This produces a node whose schema matches a hypothetical archive that includes
an `observer` column. The key point: because `append_vtr()` accepts nodes,
we can chain any transformation needed to match the target schema.

## Delete and tombstones

Sometimes we need to remove rows from an existing `.vtr` file. Perhaps a
batch of observations was flagged as erroneous, or privacy regulations
require deletion of certain records. `delete_vtr()` handles this without
rewriting the file.

Deletion works through a tombstone sidecar file. When we call
`delete_vtr(path, row_ids)`, vectra writes the specified row indices to a
`.del` file next to the `.vtr`. The original data file is never modified.
On the next `tbl()` call, the scan node reads the tombstone file and skips
those rows.

Row IDs are 0-based physical positions across the entire file. The first
row of the first row group is 0, the first row of the second group continues
from where the first left off.

```{r delete-basic}
del_demo <- tempfile(fileext = ".vtr")
write_vtr(obs[1:100, ], del_demo)

# Delete rows 0, 1, and 99 (first two and last)
delete_vtr(del_demo, c(0, 1, 99))

tbl(del_demo) |> collect() |> nrow()
```

Tombstone files are cumulative. Calling `delete_vtr()` again merges the new
indices with existing ones:

```{r delete-cumulative}
delete_vtr(del_demo, c(10, 11, 12))

tbl(del_demo) |> collect() |> nrow()
```

To undo all deletions, remove the `.del` file:

```{r delete-undo}
unlink(paste0(del_demo, ".del"))
tbl(del_demo) |> collect() |> nrow()
```

Tombstones are lightweight for sparse deletions. If we delete 1% of rows,
the overhead is negligible. But if deletions accumulate to a substantial
fraction of the file, scan performance degrades because the engine still
reads and discards the tombstoned rows. At that point, compaction makes
sense:

```{r delete-compact}
delete_vtr(del_demo, 0:49)
compacted_del <- tempfile(fileext = ".vtr")
tbl(del_demo) |> write_vtr(compacted_del)

nrow(tbl(compacted_del) |> collect())
```

The compacted file contains only the surviving rows, with no tombstone file.

## Diff between snapshots

Data pipelines often work with periodic snapshots. We receive yesterday's
extract and today's extract, and we need to know what changed. `diff_vtr()`
computes a key-based logical diff between two `.vtr` files.

The function takes the paths to the old and new files plus the name of a
key column. It streams both files and returns a list with two elements:
`$deleted` contains the key values that were present in the old file but
absent in the new one. `$added` is a lazy `vectra_node` of rows present
in the new file but not in the old.

```{r diff-setup}
snap_v1 <- tempfile(fileext = ".vtr")
snap_v2 <- tempfile(fileext = ".vtr")

# Version 1: observations 1-100
write_vtr(obs[1:100, ], snap_v1)

# Version 2: observations 51-150 (rows 1-50 removed, 101-150 added)
write_vtr(obs[51:150, ], snap_v2)
```

```{r diff-compute}
changes <- diff_vtr(snap_v1, snap_v2, "obs_id")

# Keys that disappeared
head(changes$deleted)
length(changes$deleted)
```

The deleted keys are the `obs_id` values 1 through 50. Meanwhile, the `$added`
element contains a lazy node pointing to observations with `obs_id` 101 through 150,
which we can collect or pipe into further processing:

```{r diff-added}
collect(changes$added) |> head()
```

Because `$added` is a lazy node, we can pipe it directly into further
processing. For instance, we might want to append only the truly new rows
to an archive:

```{r diff-append}
archive_diff <- tempfile(fileext = ".vtr")
write_vtr(obs[1:100, ], archive_diff)

changes$added |> append_vtr(archive_diff)

nrow(tbl(archive_diff) |> collect())
```

This is a common incremental-load pattern: diff today's snapshot against
yesterday's, then append only the additions. Combined with `delete_vtr()`
for removals, we can maintain an up-to-date archive without full rewrites.

Note that `diff_vtr()` performs a set-level diff on the key column. If a row
exists in both snapshots with the same key but different values in other
columns, it will not appear in either `$deleted` or `$added`. To detect
modifications, we would need to compare the full row content after
identifying shared keys.

## External sort

Sorting a dataset that does not fit in memory requires an external merge sort.
vectra's `arrange()` handles this automatically. The engine maintains a
1 GB memory budget. As data flows through, it accumulates rows in memory.
When the budget is reached, the accumulated rows are sorted and written to
a temporary `.vtr` file on disk (a "sorted run"). After all input has been
consumed, a k-way merge reads from all sorted runs simultaneously using a
min-heap, producing the final sorted output one batch at a time.

From the user's perspective, none of this is visible. We just call `arrange()`:

```{r external-sort}
sorted_path <- tempfile(fileext = ".vtr")

tbl(vtr_path) |>
  arrange(species, desc(abundance)) |>
  write_vtr(sorted_path)

tbl(sorted_path) |>
  select(species, abundance, site) |>
  slice_head(n = 8) |>
  collect()
```

The sorted output streams to disk. Peak memory stays bounded at the spill
budget (1 GB) plus overhead for the merge heap, regardless of input size.
For our 50,000-row example the data fits entirely in memory and no spill
occurs. But the same code would work on a 500 million row file; the sort
would produce multiple temporary runs and merge them transparently.

The 1 GB budget governs how much unsorted data the engine accumulates before
flushing to a temporary file. Each flush produces one sorted run. A 10 GB
dataset with a 1 GB budget produces roughly 10 runs. The final merge opens all
runs simultaneously and maintains a min-heap with one entry per run. At each
step, the smallest element across all runs is popped from the heap and emitted
as the next output row; the run that contributed it advances by one row and
re-inserts into the heap. Because the heap has only as many entries as there are
runs (not as many as there are rows), the merge phase uses very little memory.

Data that is already partially sorted produces fewer runs. If the input is
sorted on a prefix of the sort key, long stretches of rows will already be in
order, so the engine can absorb more rows before hitting the budget and
flushing. In the best case (fully sorted input), no spill occurs at all and
the sort reduces to a streaming pass-through. There is nothing to tune here.
The engine detects the budget internally and manages spill files in the system
temp directory. They are cleaned up after the merge completes.

Writing the sorted data to a new `.vtr` file also improves query performance.
Once the file is sorted on `species`, zone-map statistics on that column
become tight, and filters on species can skip most row groups. Sorting on a
frequently-queried column is one of the most effective optimizations
available.

## Streaming joins

vectra's join engine uses a build-right, probe-left strategy. The right-side
table is fully materialized into a hash table in memory. Then the left-side
table streams through, probing the hash table for matches one batch at a time.
This means the left side can be arbitrarily large, because only the right side needs
to fit in memory.

The natural pattern for large-data joins is: huge fact table on the left,
small dimension table on the right.

```{r join-setup}
# Small reference table: site metadata
site_meta <- data.frame(
  site      = sites,
  region    = sample(c("North", "South", "East", "West"),
                     length(sites), replace = TRUE),
  elevation = round(runif(length(sites), 100, 2500)),
  stringsAsFactors = FALSE
)
site_path <- tempfile(fileext = ".vtr")
write_vtr(site_meta, site_path)
```

```{r join-streaming}
enriched <- tempfile(fileext = ".vtr")

tbl(vtr_path) |>
  left_join(tbl(site_path), by = "site") |>
  write_vtr(enriched)

tbl(enriched) |>
  select(obs_id, site, region, elevation, species) |>
  slice_head(n = 5) |>
  collect()
```

The 50 site metadata rows become the hash table (right side). The observation
table streams through on the left, each batch probing the hash table for its
`site` key. Memory usage is proportional to the site metadata, not the
observation count.

What happens when both sides are large? The right side still must fit in
memory, so we need to reduce it. The most direct approach is to filter
the right side before the join:

```{r join-filter-right}
joined_path <- tempfile(fileext = ".vtr")

tbl(vtr_path) |>
  filter(year >= 2022) |>
  inner_join(
    tbl(site_path) |> filter(region == "North"),
    by = "site"
  ) |>
  write_vtr(joined_path)

nrow(tbl(joined_path) |> collect())
```

The filtered right side is much smaller, so the hash table stays small. The
left-side filter also reduces the number of rows that flow through the probe,
but that affects throughput rather than memory.

For self-joins or joins between two truly large tables, the options are more
constrained. The core problem is that one side must fit in memory as a hash
table. When neither side is small, we need a strategy to make one of them
small.

The most general approach is split-apply-combine: partition the left side by a
key (say, `year`), and for each partition, join against only the matching rows
from the right side. If both tables span 10 years, each partition pair is
roughly 1/10th the size of the full join. We write each partial result to disk
and consolidate with `bind_rows()` afterward. This trades a single large join
for many small joins, each of which fits in memory.

Pre-aggregation is another powerful option. If we only need summary statistics
from the right table, `group_by() |> summarise()` can reduce millions of rows
to a handful of groups before the join. A right-side table with 50 million
observations across 10,000 sites collapses to 10,000 rows after a
site-level aggregation. That fits comfortably in a hash table.

```{r join-pre-agg}
# Right side: per-site-year summary from a second dataset
summary_path <- tempfile(fileext = ".vtr")

tbl(vtr_path) |>
  group_by(site, year) |>
  summarise(site_year_avg = mean(abundance)) |>
  write_vtr(summary_path)

# Join the summary back to the detail table
tbl(vtr_path) |>
  left_join(tbl(summary_path), by = c("site", "year")) |>
  select(obs_id, site, year, abundance, site_year_avg) |>
  slice_head(n = 5) |>
  collect()
```

## Multi-file workflows

Partitioned data is common. Monthly exports, regional splits, sensor-specific
files. The partitioning dimension matters for query performance. Partitioning by
date works well when most queries filter on a time range, because we can skip
entire files that fall outside the range. Partitioning by category (region,
species, sensor type) works when queries target a specific category. The wrong
partitioning dimension forces us to read every file for every query, negating
the benefit.

`bind_rows()` combines multiple vectra nodes into a single streaming
pipeline. When all inputs share the same schema (column names and types),
vectra creates a C-level `ConcatNode` that reads from each child sequentially.
No data is copied; each source's batches flow through in order.

```{r multifile-setup}
monthly_paths <- character(6)
for (m in 1:6) {
  monthly_paths[m] <- tempfile(fileext = ".vtr")
  idx <- which(obs$year == 2024 & ((obs$obs_id %% 6) + 1) == m)
  month_data <- obs[idx[seq_len(min(200, length(idx)))], ]
  write_vtr(month_data, monthly_paths[m])
}
```

We can combine all monthly files and aggregate without loading them all:

```{r multifile-combine}
nodes <- lapply(monthly_paths, tbl)
combined <- do.call(bind_rows, nodes)

combined |>
  group_by(species) |>
  summarise(
    total_obs  = n(),
    mean_abun  = mean(abundance)
  ) |>
  arrange(desc(total_obs)) |>
  slice_head(n = 5) |>
  collect()
```

The `bind_rows()` call does not read any data. It returns a new node that,
when pulled, first exhausts all batches from the first child, then the
second, and so on. Downstream operations (the grouping, aggregation, sort)
see a single continuous stream.

We can also consolidate the partitioned files into one:

```{r multifile-consolidate}
consolidated <- tempfile(fileext = ".vtr")
nodes2 <- lapply(monthly_paths, tbl)
do.call(bind_rows, nodes2) |> write_vtr(consolidated)

nrow(tbl(consolidated) |> collect())
```

For partitioned files that arrive on a schedule, a common workflow combines
`bind_rows()` with `append_vtr()`. We process the latest batch of files
and append the result to a running archive:

```{r multifile-append}
running_archive <- tempfile(fileext = ".vtr")
initial_nodes <- lapply(monthly_paths[1:3], tbl)
do.call(bind_rows, initial_nodes) |> write_vtr(running_archive)

# Next month's files arrive
new_nodes <- lapply(monthly_paths[4:6], tbl)
do.call(bind_rows, new_nodes) |> append_vtr(running_archive)

nrow(tbl(running_archive) |> collect())
```

This gives us a single growing file that can be queried as a unit, while
each monthly append is a lightweight operation.

At some point we may want to consolidate partitioned files into a single sorted
`.vtr` for better query performance. `bind_rows()` piped into `arrange()` and
then `write_vtr()` does this in one streaming pass: the concat node reads from
each partition, the sort accumulates and spills as needed, and the writer
produces a single file with tight zone-map statistics. The original partitions
can then be archived or deleted.

## Format conversion ETL

A one-time conversion from CSV to `.vtr` pays off every time we query the
data afterwards. The `.vtr` format supports dictionary encoding, delta
encoding, zstd compression, and zone-map statistics. Repeated queries on a
`.vtr` file are faster because the engine can skip row groups and decompress
only the columns it needs. CSV requires a full parse every time.

A minimal conversion:

```{r etl-basic}
vtr_archive <- tempfile(fileext = ".vtr")
tbl_csv(csv_path) |> write_vtr(vtr_archive)
```

That single line streams the entire CSV through the vectra writer. The output
file is typed, compressed, and has zone-map statistics. For repeated queries,
the difference is substantial.

In practice, we rarely want a raw copy. The ingest step is a natural place
to clean, filter, and reshape:

```{r etl-clean}
clean_archive <- tempfile(fileext = ".vtr")

tbl_csv(csv_path) |>
  filter(quality != "poor") |>
  mutate(
    abundance_log = log(abundance + 1),
    cover_frac    = cover_pct / 100
  ) |>
  select(-quality) |>
  arrange(site, year) |>
  write_vtr(clean_archive, batch_size = 10000)
```

We filtered out poor-quality records, computed derived columns, dropped the
raw quality flag, sorted by site and year for better zone-map pruning, and
chose a 10,000-row batch size. The CSV was read once, streaming, and the
clean result was written directly to disk.

The inverse direction is also useful. We might need to export a subset
to a colleague who uses SQLite:

```{r etl-sqlite}
sqlite_export <- tempfile(fileext = ".sqlite")

tbl(clean_archive) |>
  filter(year >= 2020) |>
  write_sqlite(sqlite_export, "observations")
```

Or produce a CSV of summary statistics for a report:

```{r etl-summary-csv}
summary_csv <- tempfile(fileext = ".csv")

tbl(clean_archive) |>
  group_by(site, year) |>
  summarise(
    n_species = n_distinct(species),
    total_abundance = sum(abundance)
  ) |>
  write_csv(summary_csv)

read.csv(summary_csv) |> head()
```

Multi-source ETL pipelines combine several inputs. Suppose we have regional
CSV files that we want to merge and convert:

```{r etl-multi}
csv_north <- tempfile(fileext = ".csv")
csv_south <- tempfile(fileext = ".csv")
write.csv(obs[1:25000, ], csv_north, row.names = FALSE)
write.csv(obs[25001:50000, ], csv_south, row.names = FALSE)

merged_vtr <- tempfile(fileext = ".vtr")

bind_rows(
  tbl_csv(csv_north),
  tbl_csv(csv_south)
) |>
  filter(abundance > 0) |>
  write_vtr(merged_vtr, batch_size = 25000)

nrow(tbl(merged_vtr) |> collect())
```

Both CSVs streamed through the concat node, the filter, and the writer.
Neither was fully loaded into R at any point.

## Memory budget planning

Knowing which operations consume memory and how much lets us design pipelines
that stay within our system's limits. The central principle is "stream early,
materialize late." Every row that we can filter out before it reaches a
materializing operation (a sort, a join build, a grouping hash table) is a row
that never occupies memory. Pushing filters upstream is not just a performance
optimization; for larger-than-RAM data, it can be the difference between a
pipeline that completes and one that exhausts memory.

Here is a breakdown by operation type.

**Constant-memory (streaming) operations.** `filter`, `select`, `mutate`,
`limit`, `slice_head`, and `concat` (bind_rows) all process one batch at a
time. Their memory cost is proportional to the batch size and the number of
columns, not the total row count. For a typical batch of 50,000 rows with
10 columns, each batch might occupy a few megabytes. These operations are
safe for any dataset size.

**External sort (arrange).** vectra's sort node accumulates data in memory
up to a 1 GB budget. When the budget is exceeded, it flushes a sorted run
to a temporary file and continues. The final merge reads from all runs
simultaneously, using a heap that holds one row per run. Peak memory is
bounded at 1 GB plus the merge overhead. For datasets smaller than 1 GB
the sort completes entirely in memory.

**Hash aggregation (group_by + summarise).** The `group_agg` node maintains
one accumulator per distinct group key, so memory scales with the number of
distinct groups rather than the number of input rows. If we group by `species`
(12 values), the hash table is tiny. If we group by `obs_id` (50,000
distinct values), it is larger. Grouping by a high-cardinality column on
a billion-row dataset could create millions of accumulators, so it is worth
checking the expected group count.

**Hash join (build side).** The right-side table is fully materialized in a
hash table. Memory cost equals the right-side data size. A 1 million row
reference table with 5 columns might consume 50 to 100 MB. A 100 million row
table would require several gigabytes. The left side streams and adds no
persistent memory. String columns on the build side cost more than numeric
columns because each string value has variable length and requires its own
allocation. A build side with 1 million rows and a 200-character text column
will consume substantially more memory than one with only integer and double
columns.

**Window functions.** Window operations (`row_number`, `lag`, `lead`,
`cumsum`, etc.) operate on the current batch. Memory scales with batch size.
Partitioned windows (via `group_by`) hold data for the current partition.
If partitions are balanced and moderately sized, memory stays bounded.

To estimate memory for a pipeline, we identify the materializing operations
and estimate their footprints:

```{r memory-estimate}
# Example pipeline:
# tbl(huge.vtr) |>
#   filter(year == 2023) |>        -> streaming, ~5 MB per batch
#   left_join(sites, by = "site")  -> build side = 50 sites, ~1 KB
#   group_by(species) |>           -> 12 groups, ~1 KB
#   summarise(total = sum(abun))   -> 12 accumulators, ~1 KB
#   arrange(desc(total))           -> 12 rows, in-memory
#
# Total peak: ~5 MB (one batch) + negligible join/agg overhead

# Compare to:
# tbl(huge.vtr) |>
#   arrange(species)               -> external sort, up to 1 GB
#   left_join(big_ref, by = "id")  -> build side = big_ref size
#
# Total peak: 1 GB (sort) + big_ref size
```

The general strategy: stream everything we can, materialize only what we
must. Push filters as early as possible to reduce the volume flowing
through materializing operations. Put the smaller table on the right
side of joins. Pre-aggregate before joining when we only need summaries
from the right side.

To profile actual memory use, `explain()` prints the query plan tree before
execution. We can read the plan to count the materializing nodes and estimate
their size. For the build side of a join, we can `collect()` the right-side
node in isolation and check `object.size()` on the resulting data.frame. That
gives us the R-level footprint, which is a reasonable upper bound on the C-level
hash table size. For sort operations, the question is simpler: if the total
input is under 1 GB, the sort completes in memory; otherwise, it spills.

A practical example that puts these principles together:

```{r memory-pipeline}
final_path <- tempfile(fileext = ".vtr")

tbl(vtr_path) |>
  filter(year >= 2020, quality != "poor") |>
  left_join(tbl(site_path), by = "site") |>
  group_by(region, species) |>
  summarise(
    n_obs     = n(),
    mean_cov  = mean(cover_pct)
  ) |>
  arrange(region, desc(n_obs)) |>
  write_vtr(final_path)

tbl(final_path) |> collect() |> head(10)
```

The filter runs first, reducing the data volume before the join. The join's
build side is 50 rows of site metadata. The aggregation produces at most
4 regions times 12 species = 48 groups. The sort handles 48 rows in memory.
Total peak memory: one batch from the scan plus a few kilobytes. This
pipeline would work unchanged on a table with billions of rows.

## Cleanup

```{r cleanup}
all_files <- c(
  csv_path, vtr_path, clean_path, db_path, export_csv,
  del_demo, paste0(del_demo, ".del"),
  snap_v1, snap_v2,
  sorted_path, site_path, enriched, joined_path,
  summary_path, monthly_paths, consolidated, running_archive,
  vtr_archive, clean_archive, sqlite_export, summary_csv,
  csv_north, csv_south, merged_vtr,
  small_groups, large_groups,
  archive, csv_2017, archive_diff, compacted, compacted_del,
  final_path
)
unlink(all_files[file.exists(all_files)])
```
