Skip to main content
The Custom SQL node lets you write SQL queries against pipeline data. Queries run locally using Apache DataFusion, a high-performance analytical SQL engine that supports standard PostgreSQL syntax. Your upstream data is available as a table called input.

SQL dialect

Custom SQL uses PostgreSQL-compatible syntax. Standard SQL features are fully supported:
  • Common table expressions (CTEs) with WITH
  • Window functions (ROW_NUMBER, RANK, LAG, LEAD, etc.)
  • JOIN / LEFT JOIN / CROSS JOIN
  • UNION ALL / INTERSECT / EXCEPT
  • Subqueries (scalar, IN, EXISTS)
  • CASE WHEN / COALESCE / NULLIF
  • FILTER clause on aggregates
  • Type casting with CAST(), TRY_CAST(), and :: syntax
  • INTERVAL arithmetic
  • Boolean expressions and IS [NOT] NULL

Function reference

FunctionDescription
COUNT(*), COUNT(col)Row or non-null count
SUM(col), AVG(col)Sum and average
MIN(col), MAX(col)Minimum and maximum
STRING_AGG(col, sep)Concatenate values with separator
ARRAY_AGG(col)Collect values into an array
PERCENTILE_CONT(pct) WITHIN GROUP (ORDER BY col)Continuous percentile (use 0.5 for median)
STDDEV(col), VARIANCE(col)Standard deviation and variance
COUNT(DISTINCT col)Distinct count
BOOL_AND(col), BOOL_OR(col)Boolean aggregation
FunctionDescription
UPPER(s), LOWER(s)Case conversion
TRIM(s), LTRIM(s), RTRIM(s)Whitespace removal
SUBSTRING(s FROM pos FOR len)Extract substring
REPLACE(s, from, to)Replace occurrences
CONCAT(a, b, ...)Concatenate strings
SPLIT_PART(s, delim, idx)Split and select part
LENGTH(s), CHAR_LENGTH(s)String length
POSITION(sub IN s)Find substring position
LEFT(s, n), RIGHT(s, n)Left/right substring
LPAD(s, len, fill), RPAD(s, len, fill)Pad to length
REGEXP_REPLACE(s, pattern, replacement)Regex replacement
INITCAP(s)Title case
MD5(s), SHA256(s)Hash functions
FunctionDescription
NOW(), CURRENT_DATE, CURRENT_TIMESTAMPCurrent date/time
DATE_TRUNC(unit, ts)Truncate to unit (year, month, day, hour, etc.)
DATE_PART(unit, ts), EXTRACT(unit FROM ts)Extract date component
TO_CHAR(ts, format)Format timestamp as string
TO_TIMESTAMP(s, format)Parse string to timestamp
ts + INTERVAL '1 day'Date arithmetic
DATE_BIN(interval, ts, origin)Bin timestamps into intervals
FunctionDescription
ROW_NUMBER() OVER (...)Sequential row number
RANK() OVER (...)Rank with gaps
DENSE_RANK() OVER (...)Rank without gaps
LAG(col, offset) OVER (...)Previous row value
LEAD(col, offset) OVER (...)Next row value
FIRST_VALUE(col) OVER (...)First value in window
LAST_VALUE(col) OVER (...)Last value in window
NTILE(n) OVER (...)Divide into n buckets
SUM(col) OVER (...)Running sum
AVG(col) OVER (...)Running average
FunctionDescription
CAST(val AS type)Type conversion (errors on failure)
TRY_CAST(val AS type)Type conversion (returns NULL on failure)
val::typeShorthand cast
COALESCE(a, b, ...)First non-null value
NULLIF(a, b)Returns NULL if a = b
CASE WHEN ... THEN ... ELSE ... ENDConditional logic
GREATEST(a, b), LEAST(a, b)Max/min of values
FunctionDescription
ABS(x), CEIL(x), FLOOR(x)Absolute, ceiling, floor
ROUND(x, d), TRUNC(x, d)Round and truncate
x % y, POWER(x, y)Modulo and power
SQRT(x), LN(x), LOG(base, x)Root and logarithm
RANDOM()Random float 0–1

Common patterns

Deduplicate rows

Keep only the latest record per key:
SELECT * FROM (
  SELECT *, ROW_NUMBER() OVER (
    PARTITION BY customer_id
    ORDER BY updated_at DESC
  ) AS rn
  FROM input
) sub
WHERE rn = 1

Running total

SELECT *,
  SUM(amount) OVER (
    PARTITION BY account_id
    ORDER BY created_at
    ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
  ) AS running_total
FROM input

Pivot with CASE WHEN

PIVOT syntax is not supported. Use CASE WHEN instead:
SELECT
  customer_id,
  SUM(CASE WHEN category = 'A' THEN amount ELSE 0 END) AS category_a,
  SUM(CASE WHEN category = 'B' THEN amount ELSE 0 END) AS category_b,
  SUM(CASE WHEN category = 'C' THEN amount ELSE 0 END) AS category_c
FROM input
GROUP BY customer_id

Unpivot with UNION ALL

UNPIVOT syntax is not supported. Use UNION ALL instead:
SELECT id, 'jan' AS month, jan_sales AS sales FROM input
UNION ALL
SELECT id, 'feb' AS month, feb_sales AS sales FROM input
UNION ALL
SELECT id, 'mar' AS month, mar_sales AS sales FROM input

Conditional aggregation with FILTER

SELECT
  region,
  COUNT(*) AS total,
  COUNT(*) FILTER (WHERE status = 'active') AS active_count,
  AVG(revenue) FILTER (WHERE revenue > 0) AS avg_positive_revenue
FROM input
GROUP BY region

Gap detection

SELECT *,
  created_at - LAG(created_at) OVER (
    PARTITION BY user_id ORDER BY created_at
  ) AS time_since_last
FROM input

Execution engines

Each Custom SQL node can run on a different engine. Choose the best one for your workload:
EngineBest forCost
Local (DataFusion)Most queries, < 5M rowsFree
WarehouseLarge datasets, joining with warehouse tablesCustomer warehouse compute
Spark ConnectVery large datasets (50M+ rows)Spark cluster compute
Set the default engine in Settings → Compute. Override per-node using the engine selector dropdown in the Custom SQL configuration panel.
Use the Execution Plan preview to see which engine each node will use before running the pipeline.

AI assistance

The AI Copilot can generate Custom SQL from natural language descriptions. Open the prompt box in the Custom SQL editor and describe what you need. Copilot generates PostgreSQL-compatible SQL targeting the DataFusion engine. Use Check with AI to validate your SQL for syntax errors, type mismatches, and performance suggestions.

Advanced nodes

All advanced node types including Python and Notebooks.

SQL assistance

AI-powered SQL generation and optimization.

Pipeline troubleshooting

Common errors and debugging steps.

Compute settings

Configure default SQL engine and thresholds.