1
votes

I need to use nested checkpoints in snakemake since for every dynamic file I have to create again other dynamic files. So far, I am unable to resolve the two checkpoints properly. Below, you find a minimal toy example.

It seems that until the first checkout is not properly resolved, the second checkpoint is not even executed, thus a single aggregate rule won't work. I don't know how to invoke the two checkpoints and resolve the wildcards.



import os.path
import glob
rule all: 
  input: 
    'collect/all_done.txt'


#generate a number of files
checkpoint create_files:
  output: 
    directory('files')
  run: 
    import random
    r = random.randint(1,10)
    for x in range(r):
      output_dir = output[0] + '/' + str(x+1) 
      import os
      if not os.path.isdir(output_dir):
        os.makedirs(output_dir, exist_ok=True)
      output_file=output_dir + '/test.txt'
      print(output_file)
      with open(output_file, 'w') as f:
        f.write(str(x+1))

checkpoint create_other_files: 
  input: 'files/{i}/test.txt'
  output: directory('other_files/{i}/')
  shell: 
    '''
    L=$(( $RANDOM % 10))
    for j in $(seq 1 $L);
        do 
            mkdir -p {output}/{j}
            cp -f {input} {output}/$j/test2.txt
        done
    '''


def aggregate(wildcards):
  i_wildcard = checkpoints.create_files.get(**wildcards).output[0]
  print('in_def_aggregate')
  print(i_wildcard)
  j_wildcard = checkpoints.create_other_files.get(**wildcards).output[0]
  print(j_wildcard)
  split_files = expand('other_files/{i}/{j}/test2.txt', 
    i =glob_wildcards(os.path.join(i_wildcard, '{i}/test.txt')).i, 
    j = glob_wildcards(os.path.join(j_wildcard, '{j}/test2.txt')).j
  )
  return split_files

#non-sense collect function
rule collect:
  input: aggregate
  output: touch('collect/all_done.txt')


Currently, I get the following error from snakemake:

Building DAG of jobs...
Using shell: /bin/bash
Provided cores: 1
Rules claiming more threads will be scaled down.
Job counts:
        count   jobs
        1       all
        1       collect
        1       create_files
        3

[Thu Nov 14 14:45:01 2019]
checkpoint create_files:
    output: files
    jobid: 2
Downstream jobs will be updated after completion.

Job counts:
        count   jobs
        1       create_files
        1
files/1/test.txt
files/2/test.txt
files/3/test.txt
files/4/test.txt
files/5/test.txt
files/6/test.txt
files/7/test.txt
files/8/test.txt
files/9/test.txt
files/10/test.txt
Updating job 1.
in_def_aggregate
files
[Thu Nov 14 14:45:02 2019]
Error in rule create_files:
    jobid: 2
    output: files

InputFunctionException in line 53 of /TL/stat_learn/work/feldmann/Phd/Projects/HIVImmunoAdapt/HIVIA/playground/Snakefile2:
WorkflowError: Missing wildcard values for i
Wildcards:

Removing output files of failed job create_files since they might be corrupted:
files
Shutting down, this might take some time.
Exiting because a job execution failed. Look above for error message


I am interested in having the files /other_files/{checkpoint_1_wildcard}/{checkpoint_2_wildcard}/test2.txt

1
I don't understand what you are trying to do. create_subdirectories and create_files will give an IOError, and when I try to run this code it complains that create_files exits non-zero.Maarten-vd-Sande
As stated above, I have two consequent jobs that produce dynamic output files, s.t. in the end the following files should be created /{checkpoint_wildcard1}/{checkpoint_wildcard2}/file.txt. I was unable to provide the correct aggregate function to resolve the checkpoint wildcards. The code was just a toy example. I will fix the IOError, though.afh

1 Answers

2
votes

I am not entirely sure what you were trying to do, so I rewrote it quite some. But does clarify the problem?

import glob
import random
from pathlib import Path


rule all:
    input:
        'collect/all_done.txt'


checkpoint first:
    output:
        directory('first')
    run:
        for i in range(random.randint(1,10)):
            Path(f"{output[0]}/{i}").mkdir(parents=True, exist_ok=True)
            Path(f"{output[0]}/{i}/test.txt").touch()


checkpoint second:
    input:
        'first/{i}/test.txt'
    output:
        directory('second/{i}')
    run:
        for j in range(random.randint(1,10)):
            Path(f"{output[0]}/{j}").mkdir(parents=True, exist_ok=True)
            Path(f"{output[0]}/{j}/test2.txt").touch()


rule copy:
    input:
        'second/{i}/{j}/test2.txt'
    output:
        'copy/{i}/{j}/test2.txt'
    shell:
        """
        cp -f {input} {output}
        """


def aggregate(wildcards):
    outputs_i = glob.glob(f"{checkpoints.first.get().output}/*/")
    outputs_i = [output.split('/')[-2] for output in outputs_i]
    split_files = []
    for i in outputs_i:
        outputs_j = glob.glob(f"{checkpoints.second.get(i=i).output}/*/")
        outputs_j = [output.split('/')[-2] for output in outputs_j]
        for j in outputs_j:
            split_files.append(f"copy/{i}/{j}/test2.txt")

    return split_files


rule collect:
    input:
        aggregate
    output:
        touch('collect/all_done.txt')