Skip to content

nextflow-io/nf-databricks

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

8 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

nf-databricks plugin

A Nextflow plugin that enables seamless integration with Databricks clusters for data processing workflows. This plugin allows you to query data from Databricks tables, execute SQL statements, and integrate Databricks data into your Nextflow pipelines.

Quick Start

  1. Add the plugin to your nextflow.config:
plugins {
    id '[email protected]'
}

sql {
    db {
        databricks {
            url      = 'jdbc:databricks://<workspace-url>:443/default;transportMode=http;ssl=1;AuthMech=3;httpPath=/sql/1.0/warehouses/<warehouse-id>;IgnoreTransactions=1'
            user     = 'token'
            password = '<your-databricks-token>'
        }
    }
}
  1. Use in your workflow:
include { fromQuery; sqlExecute } from 'plugin/nf-databricks'

workflow {
    // Query data from Databricks
    data = channel.fromQuery(
        'SELECT * FROM samples.nyctaxi.trips LIMIT 10',
        db: 'databricks'
    )
    
    // Process and view the data
    data.view { "Row: $it" }
}

Configuration

Configuration Parameters

  • url: The JDBC URL for your Databricks workspace. You can find this in your Databricks SQL endpoint connection details.
  • user: Use 'token' when authenticating with a personal access token.
  • password: Your Databricks personal access token.

Usage

The plugin provides two main functions:

1. fromQuery - Reading Data from Databricks

Use fromQuery to read data from Databricks tables into Nextflow channels:

include { fromQuery } from 'plugin/nf-databricks'

workflow {
    taxi_data = channel
        .fromQuery(
            '''
            SELECT 
                tpep_pickup_datetime,
                tpep_dropoff_datetime,
                trip_distance,
                fare_amount,
                pickup_zip,
                dropoff_zip
            FROM samples.nyctaxi.trips
            WHERE trip_distance > 2.0
            LIMIT 100
            ''',
            db: 'databricks'
        )
    
    // Process the data
    taxi_data.view { row -> 
        "Trip: ${row[0]} -> ${row[1]}, Distance: ${row[2]} miles, Fare: \$${row[3]}"
    }
}

2. sqlExecute - Executing SQL Statements

Use sqlExecute to run SQL statements on Databricks:

include { sqlExecute } from 'plugin/nf-databricks'

workflow {
    // Create a table
    sqlExecute(
        db: 'databricks',
        statement: '''
            CREATE OR REPLACE TABLE my_schema.my_table AS 
            SELECT * FROM samples.nyctaxi.trips 
            WHERE trip_distance > 0 
            LIMIT 1000
        '''
    )
    
    // Insert data
    sqlExecute(
        db: 'databricks',
        statement: '''
            INSERT INTO my_schema.my_table 
            VALUES ('2023-10-01 08:00:00', '2023-10-01 08:30:00', 5.2, 15.50, '10001', '10002')
        '''
    )
}

Complete Example

Here's a comprehensive example that demonstrates creating tables, querying data, processing it with Nextflow operators, and writing results back to Databricks:

include { fromQuery; sqlExecute } from 'plugin/nf-databricks'

workflow {
    // Step 1: Create a table from sample data
    sqlExecute(
        db: 'databricks',
        statement: '''
            CREATE OR REPLACE TABLE my_schema.taxi_sample AS 
            SELECT 
                tpep_pickup_datetime,
                tpep_dropoff_datetime,
                trip_distance,
                fare_amount,
                pickup_zip,
                dropoff_zip
            FROM samples.nyctaxi.trips 
            WHERE trip_distance > 0 
            LIMIT 20
        '''
    )

    // Step 2: Query and process the data
    taxi_data = channel
        .fromQuery(
            '''
            SELECT * FROM my_schema.taxi_sample
            ''',
            db: 'databricks'
        )
        .map { row -> 
            def (pickup, dropoff, distance, fare, p_zip, d_zip) = row
            def cost_per_mile = (fare / distance).round(2)
            tuple(pickup, dropoff, distance, fare, cost_per_mile, p_zip, d_zip)
        }
        .filter { it[2] > 2.0 } // Only trips longer than 2 miles

    // Step 3: Create analysis table
    sqlExecute(
        db: 'databricks',
        statement: '''
            CREATE TABLE IF NOT EXISTS my_schema.taxi_analysis (
                pickup_time TIMESTAMP,
                dropoff_time TIMESTAMP,
                distance DOUBLE,
                fare DOUBLE,
                cost_per_mile DOUBLE,
                pickup_zip STRING,
                dropoff_zip STRING
            )
        '''
    )

    // Step 4: Insert processed data back
    taxi_data
        .map { row -> 
            """
            INSERT INTO my_schema.taxi_analysis 
            VALUES ('${row[0]}', '${row[1]}', ${row[2]}, ${row[3]}, ${row[4]}, '${row[5]}', '${row[6]}')
            """
        }
        .map { query ->
            sqlExecute(
                db: 'databricks',
                statement: query
            )
        }
}

Building

To build the plugin:

make assemble

Testing with Nextflow

The plugin can be tested without a local Nextflow installation:

  1. Build and install the plugin to your local Nextflow installation: make install
  2. Run a pipeline with the plugin: nextflow run hello -plugins [email protected]

Publishing

Plugins can be published to a central plugin registry to make them accessible to the Nextflow community.

Follow these steps to publish the plugin to the Nextflow Plugin Registry:

  1. Create a file named $HOME/.gradle/gradle.properties, where $HOME is your home directory. Add the following properties:

    • pluginRegistry.accessToken: Your Nextflow Plugin Registry access token.
  2. Use the following command to package and create a release for your plugin on GitHub: make release.

Note

The Nextflow Plugin registry is currently available as private beta technology. Contact [email protected] to learn how to get access to it.

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 2

  •  
  •