BigDataScript (BDS)

BigDataScript is a scripting language specialized for NGS pipelines/workflows. Basic functions and syntax for BDS are documented at here</a>. The grammar is quite similar to Java but there is no high-level data structure like classes and multi-dimensional arrays.

BDS basics

You have basic variable types such as int, real, string and bool. You can skip declaration of those variables using :=. For example, int n = 10 is equivalent to n := 10. Also, if you add a help context for a variable with help, it automatically becomes a command line parameter with a help context. For example, bds pipeline.bds -num_rep 3 -callpeak macs2 and bds pipeline.bds -h shows help for all parameter variables.

int num_rep = 2		help # replicates (default: 2).
type 	:= "TF" 	help Type of chipseq pipeline; TF or histone (default: TF).

BDS also provides list types such as map {} and array []. You can iterate over those variables with the following code:

int[] array = [1,2,3]

array.add(4)

for ( int n : array ) {
	...
}
for ( int i=0; i< array.size(); i++) { // C style
	n := array[i]
	...	
}

int{} map

map{"first"} = 1 	// adding element to a map
map{"second"} = 2

for ( int n : map ) {
	...
}

for ( string key : map.keys() ) {
	n := map{key}
	...
}

BDS syntax (basics)

Before moving on to next sections, read carefully about BDS syntax particularly for sys, task, wait and par on here</a>. There are more advanced syntax such as goal and dep but currently they are not used for BDS pipelines in Kundaje group (I actually found than they are buggy when combined with par). Parallelization of tasks are implemented by only using sys, task, wait and par.

You can refer to any BDS variable in a shell with prefix $

var1 	:= "test.txt"
var2 	:= "$var.tmp"
var_3	:= "test2.txt"
var_4 	:= "$var_3"+"_tmp" 	// be careful when you use an underscore in a BDS variable name.
//var_4 := "$var_3_tmp" 	// error (var_3_tmp does not exist)

sys echo "input: $var1, $var2, $var_3, $var_4"		# sys is followed by an actual shell command.

The above BDS command is equivalent to the following shell commands. You can see that $var1 is replaced with test.txt.

#!/bin/bash
echo "input: test.txt, test.txt.tmp, test2.txt, test2.txt_tmp"

sys is for a single-line shell command execution. Each sys command is executed independently by an interative BASH (defined in ./bds.config) shell generated by BDS. In the following example, two interactive shells will be created by BDS, which means shell variables (not BDS ones) defined in each sys command cannot be shared.

sys echo "starting..."
sys echo "closing..."

BDS syntax: task

To make sys commands serially executed in the same interactive shell, use task. In such task block, you can define any shell variables shared in the shell. Now we need to distinguish between BDS variables and shell variables. You can refer to a shell variable by ${VAR_NAME}.

var1 	:= "test.txt"

task {
	sys echo "BDS var: $var1"
	
	sys var2="test2.txt"
	sys echo "Shell var: ${var2}"
}

It is important to understand a BDS operator <- which compares timestamp of files. In the following example, if any of files in in and out is empty or if any of files in out is older than any of files in in, it becomes true. if task ( false ) {}, the task is skipped. This is how a BDS pipeline can skip tasks which are already done in a previous run.

in 	:= [input1, input2]
out 	:= [output] // both a string or an array of string are allowed

task ( out<-in ) {

	sys echo "inputs : $input1, $input2"
	sys echo "output : $output"
	sys cat input1 input2 > output
}

You can also specify computer resources settings (# thread, max. memory and walltime) for a task. There are special (system) predefined BDS variables for them. Do not try to redefine them in a global scope (or in a global function scope). It is recommended to make a function and define a task block in it. If you skip any of those special variables, default (predefined in a global scope) value will be used.

// Note that system variables are not defined with ':='.
// Defining them with ':=' or 'int cpus' will result in a redefinition error.

cpus 	= -1 	// if you want to use a single processor set it as -1, BDS will not pass number of threads to cluster engline (SGE, SLURM), so -1 is better than 1.
timeout = 3600 	// set default walltime for all tasks as 3600 seconds.

func()

void func {

	// Do not modify system variables in a global scope. Define new ones in a function scope.
	taskName 	:= "task name here"
	cpus 		:= 3 		// integer
	mem 		:= 3000000000 	// in bytes
	timeout 	:= 7200 	// in seconds
	system		:= "local" 	// system for a task; "local" : task runs on a UNIX thread, "sge" : task is submitted to Sun Grid Engine, ...

	task {

	}
}

BDS syntax for parallelization: task, wait and par

Parallelization of the pipeline can be implemented with task, wait and par. Consecutive tasks in a function are non-blockable. In the following example, two tasks will be queued and executed at the same time without wait, which is not an expected behavior. So explicltly add wait between tasks if you want them to be serialized.

void map( int replicate_id ) {

	task {
		echo "align ($replicate_id)"
	}

	wait // without wait, two tasks will go in parallel

	task {
		echo "post-align ($replicate_id)"
	}
	
	wait
}

wait is sort of a barrier for parallel tasks. It waits for all tasks defined in the scope it belongs to. you can actually get task id (tid) as a return value of a task block and then specify wait to wait for a specific task with tid.

void map( int replicate_id ) {

	tid := task {
		echo "align ($replicate_id)"
	}

	wait tid

	task {
		echo "post-align ($replicate_id)"
	}

	wait // wait for all tasks in this function scope finish
}

Now we want to map two replicates and want them to go in parallel. Add a prefix par to the function. Note that wait in a global scope waits for all tasks in the pipeline. Therefore, the following code performs mapping of n replicates and wait for all mapping finish and then call peaks.

par map(1)
par map(2)
...

// parallel tasks for n replicates
for (int i=3; i<=n; i++) {
	par map(i)
	// tid := par map(i), you can also get a task id for each replicate.
}

wait

callpeaks()

Note that par function can only return a task id even though it explicitly has a return type.

ret := par sum( 1, 2 ) // ret is not 3 it's a task id for par sum()

int sum( int i, int j ) {
	return i + j
}

This is why parallelized BDS pipelines must use global variables to store output file names.

int{} ret 	// global variable to store output of sum(), a map is recommended here

par sum( 1, 2 ) // ret is not 3 it's a task id for par sum()
par sum( 3, 4 )
par sum( 5, 6 )

void sum( int i, int j ) {

	key := "$i,$j" // make a key "i,j"
	ret{key} = i + j
}

Hierarchy of BDS pipelines in Kundaje lab

BDS is not object-oriented such that the hierarchy of modules is implemented by including a parent module in a child module. In this hierarchy, each module works very similar to a singleton class in OOP language. All variables and functions in a module are global so they can be referred and called in any of the successor modules, so it is important to explicitly leave comments to tell if variables and funciton are global or local. Typically an underscore (_) is prepended for such local variables and functions in their names. A module is loaded only once for a global scope so variable declaration and function calls in a global scope is called only once when the module is loaded first.

The hierarchy of basic modules (top to bottom: parent to child) are like the following:

string.bds 			# functions for string manipulation
├ git.bds 			# git info (latest commit, date) to represent pipeline version
└ sys.bds 			# system, file I/O, shell variables and BDS script paths
  └ conf.bds 			# functions to read cmd. line arg. or configuration/environment files
    ├ parallel.bds 		# parallelization (# of threads) and monitoring thread to manage tasks
    ├ cluster.bds 		# cluster information (kind of a cluster, max. memory, walltime)
    │ └ env.bds			# shell (env. modules and conda) environment
    └ output.bds 		# output directory and title, prefix
      └ filetable.bds 		# HTML filetable

[env.bds, output.bds]
 └ graphviz.bds 		# Graphviz diagram (SVG)

[graphviz.bds, filetable.bds]
 └ report.bds 			# HTML report generation, log/qc file parser, WashU browser track generation.

[git.bds, parallel.bds, report.bds]
 └ pipeline_template.bds 	# general pipeline template (all pipelines should start from including this)

[parallel.bds, report.bds]
 └ module_template.bds 		# general module template

All pipelines (genomic/non-genomic) should include pipeline_template.bds. The following hierachy is for genomic pipelines and advanced modules for them.

conf.bds
├ species.bds 			# species definition, species specific parameters and functions to read species files
├ input_fastq.bds 		# parse cmd. line arg. or read config. file to get fastqs (-fastq ...)
├ input_bam.bds 		# parse cmd. line arg. or read config. file to get bams (-bam ...)
├ input_tagalign.bds  		# parse cmd. line arg. or read config. file to get tagaligns (-tag...)
├ input_peak.bds 		# parse cmd. line arg. or read config. file to get peaks (-peak ...)
├ input_adapter.bds 		# parse cmd. line arg. or read config. file to get adapters (-adapter ...)
└ align_multimapping.bds 	# multimapping parameter

[input_fastq.bds, input_bam.bds, input_tagalign.bds, input_peak.bds]
 └ input.bds 			# manages all input genomic file types (exp. replicate and controls) and endedness (SE/PE), 

[module_template.bds, species.bds]
 ├ align_bwa.bds  		# map fastqs to get raw bam, bwa parameters, resource settings
 ├ align_trim_adapters.bds  	# map fastqs to get raw bam, bowtie2 parameters, bowtie2 resource settings
 ├ postalign_bed.bds 		# postalign functions for bed and tagaligns (including cross-corr. analysis)
 ├ callpeak_spp.bds  		# peak calling with spp
 ├ callpeak_macs2.bds  		# peak calling with macs2 (macs2 functions for chipseq and atacseq)
 ├ callpeak_naive_overlap.bds  	# naive overlap on peaks called
 ├ callpeak_filter.bds  	# filter top peaks from peak files
 ├ callpeak_idr.bds  		# IDR and QC functions
 └ signal.bds  			# signal track generation using deepTools (bamCoverage) and Anshul's align2rawsignal.

# the following two modules share align_multimapping.bds for the parameter '-multimapping'

[module_template.bds, species.bds, align_multimapping.bds]
 ├ align_bowtie2.bds  		# bowtie2 parameters, bowtie2 resource settings, align fastqs to get raw bam
 └ postalign_bam.bds 		# postalign functions for bam (including filtering bam and bam_to_tagalign...)

species.bds includes lots of parameters and data file paths (such as bwa, bowtie indices) frequently used in genomic pipelines. input.bds can read input genomic files (fastq, bam, …) from command line argument or configuration file. Therefore, genomic modules must include module_template.bds and species.bds, and genomic pipelines must include pipeline_template.bds and input.bds.

Module template

include "species.bds"
include "module_template.bds"

// variables globally used in all child modules (typically you want them to be command line parameters)
globalvar1 := ""	help Data file
glovalvar2 := 1 	// not a comand line parameter

// variables locally used in this module
localvar1 := 3


init_module_name() // initialization for a module (called only once)

chk_module_name() // check if parameters are consistent and correct, and all required data files exist (called only once)

// functions locally used in this module

void init_module_name() {

	// read from configuration file if it exists
	// use get_conf_val() for string get_conf_val_int() for integer, and so on

	globalvar1 	= get_conf_val( globalvar1,		["globalvar1"] ) 	// key name for the parameter is globalvar1
	glovalvar2 	= get_conf_val_int( glovalvar2, 	["glovalvar2"] )	// key name for the parameter is globalvar2
	...
}

void chk_module_name() {

	if ( !path_exists( globalvar1 ) ) error("globalvar1 ($globalvar1) does not exists!")
	...
}

// functions globally used in all child modules

void func1() {
}

// more functions
...

Global functions typically have the following structure. wait_par() and register_par() is for optimized parallelization according to -nth (total # threads available for the whole pipeline). wait_par() waits if the number of threads for all running tasks reaches the limit (defined by -nth). register_par() registers a task id and the number of threads for the task to the monitoring thread of the pipeline and counts the total number of threads running.

mem_func1 	:= "3G" 	help Max. memory for func1
wt_func1 	:= "23h"	help Walltime for func1


// input, output_dir, group, # threads
string func1( string input, string o_dir, string group, int nth_func1 ) {

	// get prefix path for input (remove extension of input and change its directory)
	prefix := replace_dir( rm_ext( input, ["ext1","ext",...] ), o_dir )

	// manipulate input file name for intermediate files used in a task block
	temp1 := "$prefix.tmp"
	...

	output := "$prefix.out"
	
	in 	:= [ input ] 				// make sure that 'in' is an array
	out 	:= output 				// for multiple outputs, use an array and change function return type to string[]
							// usage: (ret1, ret2, ...) = _funct( ... )

	taskName:= "taskname here"
	cpus 	:= nth_func1				// # threads for this task, skip this line for a single thread
	mem 	:= get_res_mem(mem_func1,nth_func1)	// get_res_mem() parses a string mem_func1 and divide it by nth_func1
	timewout:= get_res_wt(wt_func1) 		// get_res_wt() parses a string wt_func1

	wait_par( cpus )

	tid := task ( out<-in ) {

		sys $shcmd_init		# initialization shell command for all tasks
		//sys $shcmd_init_py3	# initialization shell command for all tasks using python3

		sys ...

		sys $shcmd_finalize		
	}

	register_par( tid, cpus )

	//add_task_to_graph( in, out, group ) // node-A -> node-B: use this for a task without a box
	add_task_to_graph( in, out, group, box_name, box_color ) // node-A -> box -> node-B: use this for a task with a box

	return out
}

You can omit resource related variables (such as mem_func1 and wt_func1) and system varaibles definition (cpus, mem and timeout) for single-threaded tasks.

// if out is an array of strings, use string[] func2( ... )
string func2( string input, string o_dir, string group ) {

	// get prefix path for input (remove extension of input and change its directory)
	prefix := replace_dir( rm_ext( input, ["ext1","ext",...] ), o_dir )

	// manipulate input file name for intermediate files used in a task block
	temp1 := "$prefix.tmp"
	...

	output := "$prefix.out"
	
	in 	:= [ input ] 				// make sure that in is an array
	out 	:= output

	taskName:= "taskname here"

	wait_par( cpus ) // cpus are already defined as 1 (or -1) in par.bds

	tid := task ( out<-in ) {

		sys $shcmd_init		# initialization shell command for all tasks
		//sys $shcmd_init_py3	# initialization shell command for all tasks using python3
		
		...

		sys $shcmd_finalize
	}

	register_par( tid, cpus )

	//add_task_to_graph( in, out, group ) // node-A -> node-B: use this for a task without a box
	add_task_to_graph( in, out, group, box_name, box_color ) // node-A -> box -> node-B: use this for a task with a box

	return out
}

If you need to use very short and light tasks, use the following template. These tasks will be not be counted by the monitoring thread which limits # threads running to -nth. But I recommend to use BDS built-in functions instead of a task block to make things simple and clear.

void func3() {

	system := "local" 	// this task will not be sumitted to a cluster engine (even though you specify it in the command line)

	task {
		sys $shcmd_init		# initialization shell command for all tasks
		//sys $shcmd_init_py3	# initialization shell command for all tasks using python3

		...		

		sys $shcmd_finalize
	}
}

Pipeline template

include "modules/pipeline.bds"
include "modules/any_module_you_want_to_include.bds"
...

main()

// global scope

void main() { 

	// global function scope
	
	init()
	chk_input()
	align()
	call_peaks()
	idr()
	report()	
}

void init() {...}

void chk_input() {...}

void align() {

	...

	par align(1)		// aligning for replicate 1
	par align(2) 		// aligning for replicate 2
	...

	wait				
}

void align( int rep_id ) {

	...
}

void call_peaks() {

	...
	wait 			// `wait` is okay because no `par` functions before it.
}

HTML Report (Graphviz diagram and file table)

AQUAS pipeline modules provide a module for a nice HTML report (./modules/report.bds). All functions with a task block (task {...}) defined in ./modules/*.bds are designed to automatically construct a graph with nodes (files) and arrows (data flow). However, a node (file) is hidden unless it is explicitly registered to the report by add_file_to_report() or add_file_to_graph(). add_file_to_report() registers a node to both the Graphviz diagram and the file table. add_file_to_table() registers a node to the file table only. add_file_to_graph() registers a node to the Graphviz diagram only.

All functions with a task block (task {...}) defined in ./modules/*.bds have group in their last parameter. group is an additional parameter for the Graphviz diagram and it’s equivalent to cluster {} in Graphviz, nodes (files) with the same group are clustered together and the group is labeled as group. Use add_label_to_graph( string group, string long_name ) to show a full group name instead of group.

include "modules/report.bds"

main()

void main() {

	init_filetable()

	align()

	report()
}

void init_filetable() {

	// the hierachy of an item must be explictly defined in a string, e.g. "Alignment/Replicate 1/Fastq", for the file table.
	// how to order items?
	// for example, you may want to put "Alignment", "Replicate 1" and "Fastq" first.
	// Alignment
	// ├ Replicate 1
	// ├ ├ Fastq
	// ├ └ Bam
	// └ Replicate 2
	// Peaks

	// first added first shown

	// for level 1
	add_label_to_table("Alignment")
	add_label_to_table("Peaks")

	// for level 2
	add_label_to_table("Replicate 1")
	add_label_to_table("Replicate 2")

	// for level 3
	add_label_to_table("Fastq")
	add_label_to_table("Bam")
}

void align() {

	fastq := get_fastq( 1 ) // get fastq for replicate 1

	add_file_to_table( fastq, "fastq", "Replicate 1", "Alignment/Replicate 1/Fastq" )
	// description for parameters
	// fastq : file path for the fastq
	// "Fastq": label of the node
	// "Replicate 1" : group of the node
	// "Alignment/Replicate 1/Fastq" : hierachy in file table, 

	// if you want to show fastq node on Graphviz diagram only (not on the file table)
	// add_file_to_graph( fastq, "fastq", "Replicate 1" )

	// if you want to show fastq item on the file table only (not on the Graphviz diagram)
	// add_file_to_table( fastq, "Alignment/Replicate 1/Fastq" )

	bam := bwa( fastq )

	// bam will not be shown on the graph but on the filetable
	add_file_to_table( bam, "Alignment/Replicate 1/Bam" )

	...
}

void report() {

	html := html_filetable() 	// treeview for directory and file structure 

	html += ... 			// add more contents for your report (format=HTML; <div> ... </div>)

	html += html_graph()		// Graphviz workflow diagram

	report( html )	
}

Bugs in BDS

Thread safety issue for global variables

Remember that all tasks (task {}) in a par function go in parallel. It is already explained that we need global variables (typically map of string to store output file names; map’s key is typically replicate id here) for parallelized BDS pipelines. There is a bug in handling global variables (locking/unlocking them). Reading (as r-value) and writing (as l-value) on a global variable in a par function will result in a crash of a pipeline. A hacky way to prevent this problem is not to read global variables in a par function.

string{} bam, filt_bam, tagalign, peak	// global variables to store pipeline outputs (filenames)

// aligning three replicates
par align_OKAY( 1 )
par align_OKAY( 2 )
par align_OKAY( 3 )
...

wait

callpeak_OKAY()

wait

void align_ERROR( int replicate_id ) {

	key := replicate_id

	fastq := get_fastq( key )

	bam{key} = _bwa( fastq ) // it's okay to write on bam{}

	wait 	// without this `wait` _bwa() and _dedup_bam() go in parallel, which is not an expected behavior

	filt_bam{key} = _dedup_bam( bam{key} ) // it's NOT OKAY to read bam{key}

	wait 

	tagalign{key} = _bam_to_tag( filt_bam{key} ) // it's NOT OKAY to read filt_bam{key}
}

void align_OKAY( int replicate_id ) {

	key := replicate_id

	fastq := get_fastq( key )

	bam_ := _bwa( fastq )
	bam{key} = bam_

	wait

	filt_bam_ := _dedup_bam( bam_ ) // it's SAFE since bam_ is a local variable
	filt_bam{key} = filt_bam_

	wait 

	tagalign_ := _bam_to_tag( filt_bam ) // it's SAFE
	tagalign{key} = tagalign_
}

void callpeak_OKAY() {
	
	// we can safely read tagalign{} since all 'par' tasks finished due to `wait` in the global scope.

	tagalign{1}
	
	peak_ := macs2( tagalign{1} )	// this is safe. you don't need to make a temporary variable for thread safety.

	...
}

Buggy syntax

goal and dep seem to be buggy (pipeline crashes) when combined with par prefix. Stay away from these syntax.

Installer for dependencies

Customize install_dependencies.sh for your own pipeline.

ENV_NAME=your_pipeline_name 		# you also need to modify [default] section in ./default.env
ENV_NAME_PY3=your_pipeline_name_py3 	# you also need to modify [default] section in ./default.env
INSTALL_WIGGLER_AND_MCR=0 		# if it's 1, install MCR (Matlab Compiler Runtime), this can take >500MB disk space

Installer for genome data

Step 1) Customize install_genome_data.sh for your own pipeline. If you modify existing pipelines (chipseq_pipeline or atac_dnase_pipeline) Skip this step.

SPECIES_FILE_BASENAME=your_pipeline_species.conf 	# species file name that will be put into [default] section in ./default.env
CONDA_ENV=your_pipeline_name 				# ENV_NAME that you defined in the previous section
BUILD_BWT2_IDX=1 					# if it's 1, build bowtie2 index
BUILD_BWA_IDX=0 					# if it's 1, build bwa index

Step 2) You can add any genome to the genome data installer. Bowtie2/Bwa indices will be built based on $REF_FA. Minimum requirement for the genome database is URL for $REF_FA. Please leave others blank if you don’t have them. $REF_FA can take .fa.gz, .fa, fa.bz2 or .2bit.

Add the following to the end of the first if shell statement.

elif [ $GENOME == "species_name" ]; then

  REF_FA= # reference genome sequence (.fa.gz, .fa, fa.bz2 or .2bit)
  BLACKLIST= # blacklist to filter out peaks in IDR
  SPECIES_BROWSER=    # specify species name if it does not match with that in WashU browser (e.g. hg38 for hg38_ENCODE)
  UMAP= # unique mappability tracks https://personal.broadinstitute.org/anshul/projects/umap

  # data for ATAQC
  TSS_ENRICH=
  DNASE=
  PROM=
  ENH=
  REG2MAP=
  ROADMAP_META=

fi
...