Script workflows in Databricks

Workflows can be constructed through the Databricks Workflows user interface (UI), however for large or complex workflows the UI can be a time consuming way to build a workflow. In these scenarios it is quicker and more inline with RAP principles to script your workflow.

For a pipeline to be built there must be scripts, queries or notebooks available to read by Databricks, either located in your workspace, or in a Git repository.

For this example we will create a folder in our workspace, create two test notebooks to comprise the workflow, and a third to script the job and set it off running. We’ll also set it up to notify us by email when the workflow successfully completes.

  1. Create a folder in your workspace to store your notebooks. First click ‘Workspace’ in the sidebar (1), then navigate to your user folder (2). Then click the blue ‘Create’ button (3) and select ‘Folder’. Give the folder a name (4) such as ‘Test workflow’ and then click the blue ‘Create’ button (5).

  2. Once in your new folder and click the blue ‘Create’ button again, this time choosing ‘Notebook’. Once in your new Notebook retitle it to ‘Test task 1’ (1), and set the default language to R (2). Then in the first code chunk write print("This is a test task") (3).

  3. Create a second workbook in the same folder titled ‘Test task 2’, and in the first code chunk write print("This is another test task") .

  4. Create a third notebook and title it ‘Create and run job’. In the first cell load the tidyversepackage. Install the devtools package and load it, then use it’s install_github() function to install the databricks package. Load the newly installed databricks package.

    library(tidyverse)

    install.packages("devtools")
    library(devtools)
    install_github("databrickslabs/databricks-sdk-r")
    library(databricks)
  1. Create a new code chunk and create a text widget to contain your DataBricks access token. Run this cell to create the widget at the top of the page. Once the widget is there add in your personal access token into the text box.
    dbutils.widgets.text("api_token", "")
Databricks access token

Personal Authentication Token (PAT)s are a unique code that is generated to let Databricks know who is accessing it from the outside. It functions as a password and so must not be shared with anyone.

If you haven’t already generated a Databricks token you can find instructions on how to do so in the Setup Databricks personal compute cluster with RStudio article.

Don’t put your token into the code

The reason we’re using a widget here for your access token is that we don’t want to take any risk of someone else being able to view your PAT token. If we were to hardcode it into the notebook then anyone with access to the code would be able to copy your PAT token and ‘masquerade’ as you.

  1. We can now connect to the API through the databricks package using the databricks::DatabricksClient() function. It requires the host which is the URL of the Databricks platform up until (and including) the first /, and your token. We’ll store the result in a variable called client as we need to pass this to the other functions in the databricks library.
    We can then use the databricks::clustersList() function to fetch a list of the clusters, which we can view using display().

    host <- "https://adb-5037484389568426.6.azuredatabricks.net/"
    api_token <- dbutils.widgets.get("api_token")
    
    client <- databricks::DatabricksClient(host = host, token = api_token)
    
    clusters <- databricks::clustersList(client)
    
    display(clusters %>% select(cluster_id, creator_user_name, cluster_name, spark_version, cluster_cores, cluster_memory_mb))

    Clusters

    The databricks::clustersList() function will return any clusters that you have permission to see.

    The data returned by the function is hierarchical, and a single ‘column’ may contain several other columns. As the display() function renders a table, you’ll have to select only columns that display() knows how to show. Generally, the columns that are at the left-most position when you run str(databricks::clustersList(client)) (shows the structure).

  2. Make a note of your cluster ID and save it in a variable called cluster_id. You could automate this step by filtering the clusters data frame as long as you ensure that it only results a single cluster_id.

      cluster_id <- "<your cluster id>"
  3. Create a new code block and we’ll start by setting some parameters for the job. Firstly we’ll need a job_name, and the paths to the Notebooks we’re wanting to include in the workflow. We’ll also need to create a unique task_key for each of the Notebook tasks we’re going to set up.

    job_name <- "test job"
    first_notebook_path <- "/Users/nicholas.treece@education.gov.uk/R SDK/Test Notebook"
    second_notebook_path <- "/Users/nicholas.treece@education.gov.uk/R SDK/Test Notebook 2"
    task_key_1 <- "test_key"
    task_key_2 <- "test_key_2"

We can then define the tasks as lists. There are many options available available for setting when creating a task, a full list of which can be found in the tasks section of the job API documentation. When reading this documentation any parameter that is marked as an object needs to be passed as a list (list()) in R, and anything marked as an array should be passed as a vector (c()).

For the first task we’ll give it the first task_key we created above, and tell it to run on our existing cluster by passing the ID of our cluster to existing_cluster_id, we’ll then specify that it is a notebook_task and pass that a list with the notebook_path and the source which we will set to WORKSPACE (as opposed to a Git repository) for the purposes of this tutorial.

 first_job_task <- list(
  task_key = task_key_1,
  existing_cluster_id = cluster_id,
  notebook_task = list(
                    notebook_path = first_notebook_path,
                    source = "WORKSPACE"
                  )
 )

For the second task we will do the same, but using the second task_key and notebook_path we defined. In addition, we’ll also add a depends_on clause with the previous task_key (passed in a list), and specify it is only to run_if ALL_SUCCESS. This means that the second task won’t begin processing unless all of the tasks it depends_on have completed successfully.

second_job_task <- list(
  task_key = task_key_2,
  existing_cluster_id = cluster_id,
  notebook_task = list(
                    notebook_path = second_notebook_path,
                    source = "WORKSPACE"
                  ),
  depends_on = list(task_key = task_key_1),
  run_if = "ALL_SUCCESS"
 )
  1. Now we have both of our tasks defined we can create the job using the databricks::jobCreate() function. We pass it the client as the first argument, then the job name we defined. The tasks are passed as a list which contains each of the task lists we built above.
    We’ll also tell it to send us email_notifications by passing a list with an on_success value of email addresses.

    The function returns the ID of the job we just created, so we will want to store the response in a variable called workflow so we can refer to it later.

    workflow <- jobsCreate(client,
                          name = job_name,
                          tasks = list(first_job_task, #list
                                       second_job_task), #list
                          email_notifications = list(
                                                  on_success = c("your-email")
                                                ))
    Lists of lists

    A list() in R is used to contain any number and type of data, including other list()s. This makes it excellent for storing hierarchical data in one place, however it can get quite confusing quite quickly.

    Sometimes it’s easier to break these lists() up into pieces by defining them seperately, as we did above by defining the task lists separately then passing them to the tasks argument in the jobsCreate() function.

    This often makes it easier to think about and construct, but certainly makes it easier to read. Consider the code below which does exactly the same thing as the code above, but is just written all at once.

    workflow <- jobsCreate(client,
                          name = job_name,
                          tasks = list(
                                    list(
                                      task_key = task_key_1,
                                      existing_cluster_id = cluster_id,
                                      notebook_task = list(
                                                        notebook_path = first_notebook_path,
                                                        source = "WORKSPACE"
                                                      )
                                    ),
                                    list(
                                      task_key = task_key_2,
                                      existing_cluster_id = cluster_id,
                                      notebook_task = list(
                                                        notebook_path = second_notebook_path,
                                                        source = "WORKSPACE"
                                                      ),
                                      depends_on = list(task_key = task_key_1),
                                      run_if = "ALL_SUCCESS"
                                    )
                                  ),
                          email_notifications = list(
                                                  on_success = c("your-email")
                                                )
                          )

    We can see here that the code is getting very long, and is also more difficult to see which options relate to which list. If it weren’t for being diligent with indentation here we’d have to resort to counting brackets to see what belonged where. This is especially problematic if you accidentally delete a bracket and need to work out where it was meant to go.

  2. We can now get the ID of the job that was created and tell the API to run the job. In a new code chunk we’ll store the job_id from the workflow variable above. We’ll then use the databricks::jobsRunNow() function to tell it to run the workflow we just created by passing it the job_id we just stored. We’ll also store the job_run_id returned by the databricks::jobsRunNow() function.

    job_id <- workflow$job_id
    
    job_run <- jobsRunNow(client, 
                          job_id = job_id)
    
    job_run_id <- job_run$run_id

    We will now use this to create links to the job and the specific run of the job we just set off.

  3. In a new code cell, define a job_link by paste0()ing the host variable we passed to the databricks::DatabricksClient() function earlier, followed by "job/" followed by the job_id defined above.
    We can then create a job_run_link by paste0()ing together the job_link followed by "/runs/" then the job_run_id from the previous step.
    We can then output the job_link as text at the bottom of the cell.

    job_link <- paste0(host,"jobs/",job_id)
    job_run_link <- paste0(job_link,"/runs/", job_run_id)
    job_link
  4. In a new cell, output the job_run_link.

    Output limits on code chunks

    Each chunk will display an output (assuming there are any) underneath the chunk once it has been run. Each chunk is limited to a single output though, which defaults to the last output generated.

    So if we had written a cell to output both links at the same time, we would still only see the job_run_link.

  5. Now click on the links and check they work.

  6. You’ve now created a workflow with code, and each time you re-run this notebook another workflow with the same name will be created. As this is a tutorial which most analysts may have to follow at some point, the logical conclusion is that we will end up with hundreds of ‘test jobs’ cluttering up the workflow page.

    To avoid that let’s use the databricks::jobsDelete() function to clean up after ourselves. All that we need to do is pass the function the client, and job_id variables from above.

    jobsDelete(client, job_id)
    Note

    If you have been running and re-running bits of this code iteratively, there’s a good chance you already have several instances of ‘test job’ listed under your name.

    If this is the case we’ll want to clean up each of these, ideally without having to manually click through the UI process for each one.

    To do this, firstly call the databricks::jobsList() function, passing it the client variable, and specifying the name of the jobs you want to list. Then filter the list to just the jobs with a creator_user_name of your email address. To see the resulting jobs use the display() function as below at the bottom of a code chunk.

    my_jobs <- jobsList(client, 
                        name = "test job") %>% 
                filter(creator_user_name == 'nicholas.treece@education.gov.uk')
    
    display(my_jobs %>% select(job_id, creator_user_name, run_as_user_name, created_time))

    We can now loop through the individual job_ids contained in my_jobs and use the databricks::jobsDelete() function to remove them all at once, programmatically.

    for(job_id in my_jobs$job_id){
      jobsDelete(client, job_id)
    }
Back to top