Tutorial

This tutorial is a practical guide for new XPFlow users. You will learn step-by-step how to orchestrate a moderately complex Grid’5000 experiment thanks to XPFlow. The final experiment will reserve Grid’5000 nodes, deploy them using Kadeploy, and compute the latency matrix between the nodes.

As the tutorial illustrates XPFlow using an experiment on Grid’5000, basic knowledge of Grid’5000 is expected. It is also expected that you went through those slides to understand the main XPFlow concepts.

Setting up your work environment

The first step is to install XPFlow on your work environment. There are two ways to use XPFlow, you can use XPFlow on Grid’5000 frontend or install on your computer.

We recommend to launch XPFlow on Grid’5000 frontend :

$ gem install --user-install --pre xpflow
$ export PATH=$PATH:$(ruby -e 'puts "#{Gem.user_dir}/bin"')

If you want to have xpflow command available always, add the last line to your ~/.bashrc.

On your computer, the easiest way is to use the Ruby gem (be sure to have Ruby and RubyGems installed):

# gem install xpflow --pre

XPFlow gem provides a main command xpflow which is its primary interface. Make sure you can launch it:

$ xpflow help

Your very first XPFlow experiment

Create a helloworld.rb file and past the following code.

# file: helloworld.rb
process :main do
    log "Hello World"
end

Now, execute it:

$ xpflow helloworld.rb

In this example, we created a trivial process (:main) which calls a built-in activity log. This activity writes its arguments into a log of the experiment.

By default, the process named :main is executed as the entry point. One can override it by specifying the entry point explicitly:

$ xpflow helloworld.rb:main

If you encounter error during execution, the verbose logging may help you to spot the problem:

$ xpflow -v helloworld.rb:main

XPFlow can visualize processes. There are a few supported output formats, but let us use PDF (you will need cairo gem installed to make it work):

$ xpflow workflow helloworld.rb -o helloword.pdf

Obviously, this will be more useful for more complex workflows.

Interacting with Grid’5000

XPFlow can access Grid’5000 transparently from any location. If it is executed at a frontend node, it will access the nodes directly. When it is executed outside Grid’5000, a proxy is used. From the user’s point of view, this mechanism is transparent.

Node reservation

The XPFlow activities to interact with Grid’5000 must be loaded with the use directive.

The following script reserves one node on the Nancy site, for 10 minutes. Notice, however, that after the script is finished, the reservation is automatically discarded by the framework (use -v to see even more).

# file: g5k-ex1.rb
use :g5k
process :main do
    job_id = g5k_reserve_nodes(:site => 'nancy', :nodes => 1, :time => '00:10:00')
    nodes = g5k_nodes(job_id)
    log nodes
end

Automatic detection of G5K jobs

Sometimes it may be easier to reserve nodes manually (using oarsub) and then use them from XPFlow. The g5k_get_avail activity can used for this purpose. Make a short reservation at Nancy site:

$ oarsub -l nodes=2,walltime=00:20:00 'sleep 1d'

Do not cancel this reservation as it will be used in following examples.

You can use this job from your XPFlow script:

# file: g5k-ex2.rb
use :g5k
process :main do
    job = g5k_get_avail :site => 'nancy'
    log(job)
end

Executing commands

One can execute commands on the local machine using the system activity. In that case the standard output of the command is returned.

# file: process-ex1.rb
process :main do
    r = system("hostname")
    log(r)
end

One can execute commands on remote machines using the execute_one activity. The result of this activity is returned, and can be be logged with log.

# file: process-ex2.rb
use :g5k
process :main do
    job_id = g5k_reserve_nodes(:site => 'nancy', :nodes => 1, :time => '00:10:00')
    node = g5k_nodes(job_id).first
    r = execute_one(node, "hostname")
    log(r)
end

The standard output of the command is not displayed by log, contrary to what happens with system. Instead, both streams (output and error) are attributes of the object returned by execute_one. A standard way to access them inside a process is to use a special pattern of the form attribute_of or object.attribute where attribute is the name of the property you want to access.

In our example we will use r.stdout. The example becomes:

# file: process-ex3.rb
use :g5k
process :main do
    job_id = g5k_reserve_nodes(:site => 'nancy', :nodes => 1, :time => '00:10:00')
    nodes = g5k_nodes(job_id).first
    r = execute_one(nodes, "hostname")
    log(r.stdout)
end

Structuring your experiment in processes and activities

The workflows (processes in XPFlow parlance) are written in a domain-specific language which is a restricted, semi-formal way to describe workflows. The low-level parts of the workflow, activities, may contain arbitrary Ruby code.

The activities are not restricted as processes normally are. However, the activities have access to various activities too. For example, one can use execute_one inside an activity:

# file: activity-ex1.rb
activity :hostname do |node|
    r = execute_one(node, "hostname")
    r.stdout
end
process :main do
    log(hostname(localhost))
end

It is recommended to keep low-level, technical elements inside well-defined activities and write high-level workflows as processes that call activities (or other processes).

Execute ping and analyze its output

As described before, the activities should be primarily used to wrap low-level actions. In the example below, an activity is defined to wrap the execution of the ping utility. That activity is then called from the main process.

# file: ping-ex1.rb
activity :ping do |from|
    r = execute_one(from, "ping localhost -c 1")
    r.stdout[/time=(\d+.*) /, 1].to_f
end
process :main do
    r = ping(localhost)
    log r
end

Note that for convenience, the XPFlow standard library already includes activities ping_node and ping_localhost.

Instrumentation of processes

The activities can be overridden and instrumented (which, in our case, means changing their structure). It is useful if you want to unintrusively change existing processes.

In the following example, we override the execute_one activity which is defined in the standard library.

# file: exec_overwrite.rb
# override existing activity
activity :execute_one do |node, cmd|
    log("Execution of '#{cmd}' on '#{node.host}'")
    parent(node, cmd)
    log("End of '#{cmd}' on '#{node.host}'")
end

Launch the example with xpflow activity-ex1.rb exec_overwrite.rb and notice how the output differs from xpflow activity-ex1.rb.

Checkpointing and restarting workflows

Checkpointing

XPFlow has a simple mechanism for checkpointing the state of an experiment. The checkpoint DSL pattern will store the state of the current workflow execution to a file so that it can be used later to restore the execution context. Note that the checkpoint does not store the state of the platform.

There are two reasons to use checkpointing. First, it helps you to write your experiments incrementally - you can add a checkpoint each time you consider a part of your process finished and XPFlow will gracefully restart from the last checkpoint. Second, the checkpoints can be added in strategic places to save your time. Such a strategic place is often a moment after a time-consuming operation that would be have to be restarted from scratch otherwise.

A simple example of checkpoint use is:

# file: checkpoint-ex1.rb
process :main do
    date = system("date")
    checkpoint(:freezer)
    date2 = system("date")
    checkpoint(:freezer2)
    log(date)
    log(date2)
end

When you relaunch the previous script, you will see that the date remains unchanged after the first execution.

When you launch XPFlow you can decide to restart from the begining with -i flag or you can select the checkpoint where XPFlow will restart with -c flag.

For example xpflow -i checkpoint-ex1.rb give two times the current date and xpflow -c freezer checkpoint-ex1.rb give an old date followed by the current date.

Using checkpointing after Grid’5000 deployment

Assuming your reservation is of :deploy type (it is the case if you still used the reservation from the previous example), you can deploy reserved nodes using kadeploy. It is customary to add a checkpoint just after g5k_kadeploy so that the deployment is not restarted every time the experiment is launched. The property :keep set to true will keep all reservations at the end of an experiment.

# file: g5k-ex3.rb
use :g5k
process :main do
    job = g5k_reserve_nodes(:site => 'nancy', :nodes => 10,
            :time => '01:00:00', :keep => true, :type => :deploy)
    nodes = g5k_kadeploy(job, "wheezy-x64-nfs")
    checkpoint(:deployed)
end

After you launch this experiment, you can add more elements to your process without restarting it from the scratch.

Modify this example as follows:

# file: g5k-ex4.rb
use :g5k
process :main do
    job = g5k_reserve_nodes(:site => 'nancy', :nodes => 10,
            :time => '01:00:00', :keep => true, :type => :deploy)
    nodes = g5k_kadeploy(job, "wheezy-x64-nfs")
    checkpoint(:deployed)
    r = execute(nodes,"hostname")
    forall r do |data|
       log(data.stdout)
    end
end

Advanced workflow patterns

This section will briefly introduce more advanced patterns available in XPFlow. A complete list of patterns is available on a dedicated page. With any example in the tutorial, you can use -g flag to see the Gantt diagram of experiment execution. It may help to understand the details of execution. The Gantt diagram data can be exported to a file as well, using the -G switch (see help for details).

The main goal of this section is to build an experiment that measures latency between nodes using the ping command.

Sequential loop

Grid’5000 has many sites: Bordeaux, Grenoble, Lille, Luxembourg, Lyon, Nancy, Nantes, Reims, Rennes, Sophia and Toulouse.

We will iterate each site to make one reservation at each site. Unfortunately, reservation may fail for various reasons (i.e., no nodes available, ongoing maintenance, etc.). For this reason, we add a flag :ignore_errors set to true to avoid experiment failure if an error occurs (the default behavior).

# file: ping_exp_ex1.rb
use :g5k

process :reserve_nodes do
    sites = g5k_sites()
    foreach(sites, :ignore_errors => true) do |site|
        job_id = g5k_reserve_nodes(:site => site, :nodes => 1, :time => '01:00:00')
        g5k_nodes(job_id).first
    end
end

activity :parse_output do |r|
    r.stdout[/time=(\d+.*) /, 1].to_f
end

process :main do
    nodes = reserve_nodes()
    raw_data = foreach nodes do |from|
        log(from)
        parse_output(execute_one(from,"ping -c 1 nancy"))
    end
    log(raw_data)
end

Parallel patterns

You may notice that doing multi-site reservations sequentially is rather slow. For that reason, XPFlow features parallel patterns to run sub-processes in parallel.

For example, one can use parallel pattern to run two workflows concurrently:

# file: parallel-ex1.rb
activity :sleep2 do |s, name|
    log "#{name} starts."
    sleep(s)
    log "#{name} ends."
end

process :main do
    parallel do
        sleep2 2, 'one'
        sequence do
            sleep2 1, 'two'
            sleep2 2, 'three'
        end
    end
end

Notice how the executions of sleep2 activity are interleaved.

Parallel loop

Now, we show how to use parallel loops in our experiment. We will replace foreach with forall which does exactly the same thing, however each loop iteration is done in parallel.

The example becomes:

# file: ping_exp_ex2.rb
use :g5k

process :reserve_nodes do
    sites = g5k_sites()
    forall(sites, :ignore_errors => true) do |site|
        job_id = g5k_reserve_nodes(:site => site, :nodes => 1, :time => '01:00:00')
        g5k_nodes(job_id).first
    end
end

activity :parse_output do |r|
    r.stdout[/time=(\d+.*) /, 1].to_f
end

process :main do
    nodes = run(:reserve_nodes)
    raw_data = forall nodes do |from|
        log(from)
        parse_output(execute_one(from, "ping -c 1 nancy"))
    end
    log(raw_data)
end

Compare the execution time between the forall and foreach patterns.

Now, let us add a nested parallel loop that will execute the ping command between each pair of nodes.

# file: ping_exp_ex3.rb
use :g5k

process :reserve_nodes do
    sites = g5k_sites()
    forall(sites, :ignore_errors => true) do |site|
        job_id = g5k_reserve_nodes(:site => site, :nodes => 1, :time => '01:00:00')
        g5k_nodes(job_id).first
    end
end

activity :parse_output do |r|
    r.stdout[/time=(\d+.*) /, 1].to_f
end

process :main do
    nodes = run(:reserve_nodes)
    raw_data = forall nodes do |from|
        forall nodes do |dest|
            parse_output(execute_one(from, "ping -c 1 #{dest.host}"))
        end
    end
    log(raw_data)
end

We have our results now, but they are not attributed to any node. In the example below we attach to each node an address of the destination node. Therefore each result is a pair of the destination node and the result from ping command.

# file: ping_exp_ex4.rb
use :g5k

process :reserve_nodes do
    sites = g5k_sites()
    forall(sites, :ignore_errors => true) do |site|
        job_id = g5k_reserve_nodes(:site => site, :nodes => 1, :time => '01:00:00')
        g5k_nodes(job_id).first
    end
end

activity :parse_output do |r|
    r.stdout[/time=(\d+.*) /, 1].to_f
end

process :main do
    nodes = run(:reserve_nodes)
    raw_data = forall nodes do |from|
        forall nodes do |dest|
            output = parse_output(execute_one(from, "ping -c 1 #{dest.host}"))
            value([dest.host, output])
        end
    end
    log(raw_data)
end

TakTuk

Our previous experiment works fine, but in a large-scale context, the standard execution method (which uses SSH connections) may not be scalable. The solution is to use tool such as TakTuk. XPFlow provides two activities to use TakTuk: execute_many and execute_many_here. The difference is where the results of execution are stored: with execute_many output streams are stored on remote nodes and with execute_many_here the streams are automatically transfered to a node where XPFlow is running. Despite semantical differences, both activities are interchangeable.

Our example becomes:

# file: ping_exp_ex5.rb
use :g5k

process :reserve_nodes do
    sites = g5k_sites()
    forall(sites, :ignore_errors => true) do |site|
        job_id = g5k_reserve_nodes(:site => site, :nodes => 1, :time => '01:00:00')
        g5k_nodes(job_id).first
    end
end

activity :parse_output do |r|
    r.stdout[/time=(\d+.*) /, 1].to_f
end

process :main do
    nodes = run(:reserve_nodes)
    raw_data = forall nodes do |dest|
        value([ dest.host, execute_many_here(nodes, "ping -c 1 #{dest.host}") ])
    end
    log(raw_data)
end

The log of this experiment does not contain useful information yet. We need to create an activity to handle the results of execute_many.

# file: ping_exp_ex6.rb
use :g5k

process :reserve_nodes do
    sites = g5k_sites()
    forall(sites, :ignore_errors => true) do |site|
        job_id = g5k_reserve_nodes(:site => site, :nodes => 1, :time => '01:00:00')
        g5k_nodes(job_id).first
    end
end

activity :parse_output do |r|
    r.stdout[/time=(\d+.*) /, 1].to_f
end

activity :parse_output_exec_many do |raw_data|
    raw_data.to_list.map do |rd|
        [ rd.node.host, run(:parse_output,rd) ]
    end
end

process :main do
    nodes = run(:reserve_nodes)
    raw_data = forall nodes do |dest|
        rd = execute_many_here(nodes, "ping -c 1 #{dest.host}")
        rd_parsed = run(:parse_output_exec_many, rd)
        value([ dest.host, rd_parsed ])
    end
    log(raw_data)
end

Final experiment

In this section we format the results of the previous experiment and we show how to save it.

In the previous example the array contained the destination hostname and an array of ping measurements from different nodes. In this section we present an experiment that saves a list of triplets containing: a source host, a destination host and round-trip time between them.

# file: final.rb
use :g5k

activity :parse_output do |r|
    r.stdout[/time=(\d+.*) /, 1].to_f
end

activity :handle_results do |raw_data|
    data = []
    raw_data.each do |rd|
        rd[1].each do |byexp|
            data << [byexp.node.host, rd[0].host, run(:parse_output, byexp)]
        end
    end
    data
end

process :reserve_nodes do
    sites = g5k_sites()
    nodes = forall(sites, :ignore_errors => true) do |site|
        job_id = g5k_reserve_nodes :site => site, :nodes => 1, :time => '01:00:00'
        g5k_nodes(job_id).first
    end
end

process :main do
    nodes = run(:reserve_nodes)
    raw_data = forall nodes do |dest|
        value([ dest, execute_many_here(nodes, "ping -c 1 #{dest.host}") ]) 
    end
    data = handle_results(raw_data)
    forall data do |triplet|
        log(triplet)
    end
end

Saving your results

To finish this tutorial, we show how to save the data of the experiment with the result pattern. This pattern takes a filename as its argument. If the file does not exist, a subworkflow is executed and its result is saved to the given file (in YAML format). If the file exists, the subworkflow is not executed - its result is loaded from the file instead (similar to with checkpointing).

# file: final_saved.rb
use :g5k

activity :parse_output do |r|
    r.stdout[/time=(\d+.*) /, 1].to_f
end

activity :handle_results do |raw_data|
    data = []
    raw_data.each do |rd|
        rd[1].each do |byexp|
            data << [ byexp.node.host, rd[0].host,run(:parse_output, byexp)]
        end
    end
    data
end

process :reserve_nodes do
    sites = g5k_sites()
    nodes = forall(sites, :ignore_errors => true) do |site|
        job_id = g5k_reserve_nodes(:site => site, :nodes => 1, :time => '01:00:00')
        g5k_nodes(job_id).first
    end
end

process :main do
    data = result 'results.yml' do 
        nodes = run(:reserve_nodes)
        raw_data = forall nodes do |dest|
            value([ dest, execute_many_here(nodes, "ping -c 1 #{dest.host}") ]) 
        end
        handle_results(raw_data)
    end
    forall data do |triplet|
        log(triplet)
    end
end

 Going further

External variables

The experiment can be parametrized with external variables that are passed to XPFlow via the special function var. For example:

# file: var-ex1.rb
process :main do
    log "Hello #{var(:firstname)} #{var(:name)}"
end

Two variables are defined (:firstname and :name) which are strings by default. One can specify them using the -V switch:

$ xpflow helloname.rb -V name=Smith,firstname=Henri

or

$ xpflow helloname.rb

Handling failures

During experiments, it is not uncommon for programs to crash or loop indefinitely. In this example, we present the try pattern, which executes a block and re-execute it several times in case it raises an error or a timeout expires.

The script below sleeps from 0 to 2 seconds (thus the timeout will be reached sometimes). Try to run it several times to produce different results.

# file: try-ex1.rb
process :main do
    res = try :retry => 10, :timeout => 1 do
        log("starting sleep")
        system("sleep $(shuf -i 0-3 -n 1) && echo finished")
    end
    log res
end

Instead of using a system call, it is possible to use Ruby’s own random number generator. It is possible to include Ruby code in a process using an anonymous activity introduce by the code construct:

In this example, we use the anonymous activity with code keyword. This provides to write simply write ruby part in DSL language.

# file: try-ex2.rb
process :main do
    res = try :retry => 10, :timeout => 1 do
        rnd = code { rand(5) }
        log "try #{rnd}"
        system("sleep #{rnd} && echo finished")
    end
    log res
end

It is also possible to specify the seed for the random number generator, in order to get reproducible random values: xpflow try.rb -V seed=67