Skip to content

duckdb in parallel #945

@Saarialho

Description

@Saarialho

Hello!

The Problem

I'm trying do parallel computations in duckdb and then export the results (in R using windows). My actual dataset is quite massive and I need to do hundreds of computations every day, each result containing around 30k rows.

I am aware that duckdb can either have one read-write process or multiple read only processes. However concurrency is supported within a single process by threading (https://duckdb.org/docs/guides/python/multiple_threads). I am assuming this is not possible in R?

I could write in parallel to parquet but I often need to do backfills which is quite heavy operation since with parquet I need to open and write the whole dataset due to change in one column.

Second option is exporting to SQLite using the extension but in order for that to work I need to query the duckdb result in memory and then export to SQLite. In my reprex the second method gives lock errors.

This quite unambiguous problem and probably wrong place to ask but here you go

Reproducible Example

library(RSQLite)
library(duckdb)
#> Loading required package: DBI
library(furrr)
#> Warning: package 'furrr' was built under R version 4.2.3
#> Loading required package: future
#> Warning: package 'future' was built under R version 4.2.3

# Create temporary directories for databases
temp_dir <- tempdir()
db_path <- file.path(temp_dir, "duckdb_test.db")
sqlite_path <- file.path(temp_dir, "sqlite_test.db")

# Generate sample data
create_sample_data <- function(con) {
  # Create dates and IDs
  dates <- as.Date("2024-01-01") + 0:50
  ids_per_date <- 10
  
  # Generate sample data
  sample_data <- expand.grid(
    date = dates,
    id = sprintf("ID_%03d", 1:ids_per_date),
    stringsAsFactors = FALSE
  )
  
  # Create and populate the table
  dbWriteTable(con, "test_data", sample_data, overwrite = TRUE)
  
  # Verify data
  count <- dbGetQuery(con, "SELECT COUNT(*) as count FROM test_data")
  message(sprintf("Created sample data with %d rows", count$count))
}

con <- dbConnect(duckdb::duckdb(), db_path)
create_sample_data(con)
#> Created sample data with 510 rows

# Install and load SQLite extension
dbExecute(con, "INSTALL sqlite")
#> [1] 0
dbExecute(con, "LOAD sqlite")
#> [1] 0

dbDisconnect(con, shutdown = TRUE)

# Get dates for processing
get_dates <- function() {
  con <- dbConnect(duckdb::duckdb(db_path, read_only = TRUE))
  on.exit(dbDisconnect(con, shutdown = TRUE))
  dates <- dbGetQuery(con, "SELECT DISTINCT date FROM test_data ORDER BY date")$date
  dates
}

# Approach 1: Using SQLite connection directly
process_date_sqlite <- function(date) {
  # Connect to DuckDB
  con_duck <- dbConnect(duckdb::duckdb(db_path, read_only = TRUE))
  on.exit(dbDisconnect(con_duck, shutdown = TRUE), add = TRUE)
  
  # Query data from DuckDB
  query <- sprintf("
    SELECT id, date
    FROM test_data
    WHERE date = '%s'
  ", date)
  
  data <- dbGetQuery(con_duck, query)
  
  if (nrow(data) > 0) {
    # Connect to SQLite
    con_sqlite <- dbConnect(RSQLite::SQLite(), sqlite_path)
    on.exit(dbDisconnect(con_sqlite), add = TRUE)
    
    # Set pragmas
    dbExecute(con_sqlite, "PRAGMA busy_timeout = 10000")
    dbExecute(con_sqlite, "PRAGMA journal_mode = WAL")
    
    tryCatch({
      # Start transaction
      dbExecute(con_sqlite, "BEGIN IMMEDIATE TRANSACTION")
      
      # Ensure table exists
      dbExecute(con_sqlite, "
        CREATE TABLE IF NOT EXISTS test_data_combined (
          id TEXT COLLATE NOCASE NOT NULL,
          date TEXT NOT NULL,
          UNIQUE(id, date)
        ) STRICT
      ")
      
      # Append data
      dbWriteTable(con_sqlite, "test_data_combined", data, append = TRUE)
      
      # Commit transaction
      dbExecute(con_sqlite, "COMMIT TRANSACTION")
      
      message(sprintf("Approach 1: Successfully processed date %s", date))
    }, error = function(e) {
      dbExecute(con_sqlite, "ROLLBACK TRANSACTION")
      warning(sprintf("Approach 1: Error processing date %s: %s", date, e$message))
    })
  }
}

# Approach 2: Using DuckDB's SQLite extension
process_date_duckdb <- function(date) {
  # Connect to DuckDB
  con_duck <- dbConnect(duckdb::duckdb(db_path, read_only = TRUE))
  on.exit(dbDisconnect(con_duck, shutdown = TRUE), add = TRUE)
  
  # Attach SQLite database
  dbExecute(con_duck, sprintf("
    ATTACH '%s' AS sqlite_db (
      TYPE SQLITE,
      READ_ONLY false,
      BUSY_TIMEOUT 10000,
      JOURNAL_MODE 'WAL'
    )", sqlite_path))
  
  # Ensure target table exists
  dbExecute(con_duck, "
    CREATE TABLE IF NOT EXISTS sqlite_db.test_data_combined (
      id TEXT COLLATE NOCASE NOT NULL,
      date TEXT NOT NULL,
      UNIQUE(id, date)
    )
  ")
  
  # Direct insert with explicit CAST
  transfer_query <- sprintf("
    INSERT INTO sqlite_db.test_data_combined 
    SELECT CAST(id AS TEXT) AS id, date
    FROM test_data
    WHERE date = '%s'
  ", date)
  
  tryCatch({
    dbBegin(con_duck)
    dbExecute(con_duck, transfer_query)
    dbCommit(con_duck)
    message(sprintf("Approach 2: Successfully processed date %s", date))
  }, error = function(e) {
    dbRollback(con_duck)
    warning(sprintf("Approach 2: Error processing date %s: %s", date, e$message))
  })
}


dates <- get_dates()
future::plan("multisession", workers = 4)
#future::plan(sequential)
options(future.rng.onMisuse = "ignore")

tictoc::tic()
# Test Approach 1: SQLite connection
message("\nTesting Approach 1: Direct SQLite connection")
#> 
#> Testing Approach 1: Direct SQLite connection
furrr::future_walk(dates, process_date_sqlite)
#> Approach 1: Successfully processed date 2024-01-01
#> Approach 1: Successfully processed date 2024-01-02
#> Approach 1: Successfully processed date 2024-01-03
#> Approach 1: Successfully processed date 2024-01-04
#> Approach 1: Successfully processed date 2024-01-05
#> Approach 1: Successfully processed date 2024-01-06
#> Approach 1: Successfully processed date 2024-01-07
#> Approach 1: Successfully processed date 2024-01-08
#> Approach 1: Successfully processed date 2024-01-09
#> Approach 1: Successfully processed date 2024-01-10
#> Approach 1: Successfully processed date 2024-01-11
#> Approach 1: Successfully processed date 2024-01-12
#> Approach 1: Successfully processed date 2024-01-13
#> Approach 1: Successfully processed date 2024-01-14
#> Approach 1: Successfully processed date 2024-01-15
#> Approach 1: Successfully processed date 2024-01-16
#> Approach 1: Successfully processed date 2024-01-17
#> Approach 1: Successfully processed date 2024-01-18
#> Approach 1: Successfully processed date 2024-01-19
#> Approach 1: Successfully processed date 2024-01-20
#> Approach 1: Successfully processed date 2024-01-21
#> Approach 1: Successfully processed date 2024-01-22
#> Approach 1: Successfully processed date 2024-01-23
#> Approach 1: Successfully processed date 2024-01-24
#> Approach 1: Successfully processed date 2024-01-25
#> Approach 1: Successfully processed date 2024-01-26
#> Approach 1: Successfully processed date 2024-01-27
#> Approach 1: Successfully processed date 2024-01-28
#> Approach 1: Successfully processed date 2024-01-29
#> Approach 1: Successfully processed date 2024-01-30
#> Approach 1: Successfully processed date 2024-01-31
#> Approach 1: Successfully processed date 2024-02-01
#> Approach 1: Successfully processed date 2024-02-02
#> Approach 1: Successfully processed date 2024-02-03
#> Approach 1: Successfully processed date 2024-02-04
#> Approach 1: Successfully processed date 2024-02-05
#> Approach 1: Successfully processed date 2024-02-06
#> Approach 1: Successfully processed date 2024-02-07
#> Approach 1: Successfully processed date 2024-02-08
#> Approach 1: Successfully processed date 2024-02-09
#> Approach 1: Successfully processed date 2024-02-10
#> Approach 1: Successfully processed date 2024-02-11
#> Approach 1: Successfully processed date 2024-02-12
#> Approach 1: Successfully processed date 2024-02-13
#> Approach 1: Successfully processed date 2024-02-14
#> Approach 1: Successfully processed date 2024-02-15
#> Approach 1: Successfully processed date 2024-02-16
#> Approach 1: Successfully processed date 2024-02-17
#> Approach 1: Successfully processed date 2024-02-18
#> Approach 1: Successfully processed date 2024-02-19
#> Approach 1: Successfully processed date 2024-02-20
tictoc::toc()  
#> 2.98 sec elapsed

# Clean up SQLite database for second test
unlink(sqlite_path)
  
# Test Approach 2: DuckDB SQLite extension
tictoc::tic()
message("\nTesting Approach 2: DuckDB SQLite extension")
#> 
#> Testing Approach 2: DuckDB SQLite extension
furrr::future_walk(dates, process_date_duckdb)
#> Approach 2: Successfully processed date 2024-01-01
#> Approach 2: Successfully processed date 2024-01-02
#> Approach 2: Successfully processed date 2024-01-03
#> Approach 2: Successfully processed date 2024-01-04
#> Approach 2: Successfully processed date 2024-01-05
#> Approach 2: Successfully processed date 2024-01-06
#> Approach 2: Successfully processed date 2024-01-07
#> Approach 2: Successfully processed date 2024-01-08
#> Approach 2: Successfully processed date 2024-01-09
#> Approach 2: Successfully processed date 2024-01-10
#> Approach 2: Successfully processed date 2024-01-11
#> Approach 2: Successfully processed date 2024-01-12
#> Approach 2: Successfully processed date 2024-01-13
#> Approach 2: Successfully processed date 2024-01-14
#> Approach 2: Successfully processed date 2024-01-15
#> Approach 2: Successfully processed date 2024-01-16
#> Approach 2: Successfully processed date 2024-01-17
#> Approach 2: Successfully processed date 2024-01-18
#> Approach 2: Successfully processed date 2024-01-19
#> Approach 2: Successfully processed date 2024-01-20
#> Approach 2: Successfully processed date 2024-01-21
#> Approach 2: Successfully processed date 2024-01-22
#> Approach 2: Successfully processed date 2024-01-23
#> Approach 2: Successfully processed date 2024-01-24
#> Approach 2: Successfully processed date 2024-01-25
#> Approach 2: Successfully processed date 2024-01-26
#> Approach 2: Successfully processed date 2024-01-27
#> Approach 2: Successfully processed date 2024-01-28
#> Approach 2: Successfully processed date 2024-01-29
#> Approach 2: Successfully processed date 2024-01-30
#> Approach 2: Successfully processed date 2024-01-31
#> Warning in value[[3L]](cond): Approach 2: Error processing date 2024-02-01: rapi_execute: Failed to run query
#> Error: Invalid Error: database is locked
#> Approach 2: Successfully processed date 2024-02-02
#> Approach 2: Successfully processed date 2024-02-03
#> Approach 2: Successfully processed date 2024-02-04
#> Approach 2: Successfully processed date 2024-02-05
#> Approach 2: Successfully processed date 2024-02-06
#> Approach 2: Successfully processed date 2024-02-07
#> Approach 2: Successfully processed date 2024-02-08
#> Approach 2: Successfully processed date 2024-02-09
#> Approach 2: Successfully processed date 2024-02-10
#> Approach 2: Successfully processed date 2024-02-11
#> Approach 2: Successfully processed date 2024-02-12
#> Approach 2: Successfully processed date 2024-02-13
#> Approach 2: Successfully processed date 2024-02-14
#> Approach 2: Successfully processed date 2024-02-15
#> Approach 2: Successfully processed date 2024-02-16
#> Approach 2: Successfully processed date 2024-02-17
#> Approach 2: Successfully processed date 2024-02-18
#> Approach 2: Successfully processed date 2024-02-19
#> Approach 2: Successfully processed date 2024-02-20
tictoc::toc()  
#> 3.45 sec elapsed

Created on 2025-01-03 with reprex v2.0.2

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions