StratOS Usage

StratOS is the working title of the Big Data framework that I have been developing while working as an Assistant Project Scientist at UC Riverside. Here's a draft of the documentation for the basic usage of the framework. It consists of some basic examples and documentation of each function call that is made available to the user. I'm keeping the documentation here until I have a better place (perhaps on astro.ucr.edu). The motivation for the project and architectural details will be provided in a paper.

Basic Usage: Batch Processing

Currently, the StratOS interface is only available through the Python module, 'stratos.' The most basic use-case involves using StratOS as a batch processor. For example, to run the command, gsnap -nt 01111, on a set of files:

 
from stratos import Processor

job = Processor(master="foam:5050", threads_per_task=2, job_id="demo")

job.glob_batch("gsnap -nt 01111 %f%", "/dfs/snapshots/*_2*")

results = job.get_results()

The import statement loads the StratOS Processor. The Processor class constructor can be provided with several arguments which modify the behavior of the framework. Only the master, which specifies the address and port number of the Mesos master, is absolutely required. In this example, I have also specified that each task will be allocated two threads (threads_per_task=2) and that the name of the job is "demo" (job_id="demo").

The Processor object is then used to launch a batch job. The job consists of running the command gsnap -nt 01111 on a set of snapshot files located in the HDFS, which is mounted at /dfs/:

    job.glob_batch("gsnap -nt 01111 %f%", "/dfs/snapshots/*_2*")

The wildcards in the filename in the second argument of glob_batch() are expanded, so that /dfs/snapshots/*_2* matches all files in the snapshot directory that contain the pattern, *_2*. In this particular snapshot directory, the files all begin with "snapshot_" and end in a three-digit number, so *_2* matches snapshot_200, snapshot_201, and so on. The symbol %f% in the first argument of the glob_batch() method is a file name place-holder, which is then replaced with the expanded file names. Thus, the function call, shown above, launches the following commands on the cluster or data center:

    gsnap -nt 01111 <[/dfs/snapshots/snapshot_200]>
    gsnap -nt 01111 <[/dfs/snapshots/snapshot_201]>
    gsnap -nt 01111 <[/dfs/snapshots/snapshot_202]>
      ⋮    
    gsnap -nt 01111 <[/dfs/snapshots/snapshot_299]>

Where gsnap is a program that is installed on each machine in the cluster / data center. To launch a program that is not installed, the executable would have to be placed in a network file system (e.g., NFS or HDFS) that is accessible by all of the nodes in the cluster.

The brackets, <[ ]>, indicate that the enclosed file is stored on the HDFS. The framework automatically identifies which machines contain local copies of the data stored in each file. When these brackets are present, tasks are assigned to nodes that contain relevant data whenever possible. This minimizes network traffic and improves the data read speed (Note: In more advanced use-cases, several file names can be used in a single command; tasks are launched on machines that contain the largest fraction of the data stored in the set of files).

When using the glob_batch() method (or any *_batch method) the current thread in the Python interpreter waits for the launched tasks to complete (i.e., the thread is blocked). By default, the standard output from each task is sent back to the scheduler. This output is accessed by calling the get_results() method.

Continuing the example, the results can be further analyzed in Python quite easily. The output of the gsnap command is a tab-delimited list of numbers. The second number in the output list is the velocity dispersion of the stars in the snapshot. Those velocity dispersions can be extracted and further analyzed. For instance,

import numpy as np
from scipy import stats

vel_disp = np.array([float(res.split('\t')[1]) for res in results])

minimum = vel_disp.min()

mean = vel_disp.mean()

maximum = vel_disp.max()

standard_deviation = vel_disp.std()

median = np.median(vel_disp)

mode = stats.mode(vel_disp)

kurtosis = stats.kurtosis(vel_disp)

skewness = stats.skew(vel_disp)

fifthmoment = stats.moment(a, 5)

On line 11, a list comprehension is used to extract the second number from each output string. These numbers are stored in a Python list as floating point values. The list is then used to construct a NumPy array, which can easily be analyzed, plotted, or used as input data in a larger analysis scheme.

It should be noted that the program, gsnap, did not need to be modified in order to use it with StratOS. StratOS is able to run any program that can be operated using command line arguments (i.e., any program that could be controlled by a Unix shell script). Also, note that it is possible to save the standard output of each task to the file system or discard it entirely, rather than sending the standard output back to the scheduler.

Streaming Mode

The batch mode of operation is useful in some situations, but it limits the user because, once the commands are submitted to the cluster, nothing can be done until all of the tasks have finished executing. In streaming mode, the framework does not block the thread after a set of commands are launched. More commands can be added to the job before the previously-submitted tasks have finished executing. In streaming mode, the user can also check the status of individual tasks, as well as the status of the job as a whole. Modifying the example above by replacing "batch" with "stream," we have the following:

 
from stratos import Processor

job = Processor(master="foam:5050", threads_per_task=2, job_id="demo")

job.glob_stream("gsnap -nt 01111 %f%", "/dfs/snapshots/*_2*")

job.glob_stream("gsnap -nt 01111 %f%", "/dfs/snapshots/*_4*")

results = job.get_results()

The second glob_stream() function call is performed while the tasks specified in the first call are already beginning to run on the cluster; it simply adds more tasks to the list of tasks that are being scheduled for execution. Depending on data placement and node availability, some tasks specified in the second call may execute before tasks from the first call; StratOS will try to schedule tasks so that the entire job finishes as quickly as possible. Calling get_results() in streaming mode returns only the results that have arrived since the previous call to get_results(). For example, calling get_results() one minute after submitting the tasks to the scheduler might return 30 results. Waiting another 20 seconds and calling get_results() might return 4 new results. To collect all results in a list, the user could append the results to the list of previously-received results, like so:

 
results += job.get_results()

The fact that the processor does not block the thread in streaming mode also allows the user to run multiple jobs simultaneously from the same script. For instance, the user might launch one job called map and another named reduce:

from stratos import Processor

map = Processor(master="foam:5050", job_id="map")

reduce = Processor(master="foam:5050", job_id="red")

map.glob_stream("mapper %f% /dfs/map_results", "/dfs/raw_data/*")

reduce.glob_stream("reducer %f%", "/dfs/map_results/*")

results = reduce.get_results()

This is not a complete example of an implementation of MapReduce in StratOS, but it is a hint.

To inspect the status of each job, programatically, the Processor's stream object is accessed directly:

 
from stratos import Processor

job = Processor(master="foam:5050", threads_per_task=2, job_id="demo")

job.glob_stream("gsnap -nt 01111 %f%", "/dfs/snapshots/*")

s = job.stream

number_of_tasks_currently_running = s.count_running_tasks()

list_of_tasks_currently_running = s.list_running_tasks()

number_of_unfinished_tasks = s.count_unfinished_tasks()

total_number_of_tasks_submitted = s.count_total_tasks()

command_associated_with_task_423 = s.get_command(423)

status_of_task_357 = s.get_status(357)

If, at some point, you wish to cause the stream to block the current thread, use the wait() method. Continuing the previous example,

command_associated_with_task_423 = s.get_command(423)

status_of_task_357 = s.get_status(357)

job.wait() # Returns True when all tasks have completed

print ("this line is executed after all tasks have completed")

Task Monitoring and Relaunching

StratOS allows the user to write scripts or programs that monitor the standard output and error streams as well as the resource usage of each task. If certain user-defined conditions are met, the monitoring script can terminate the task. The user also has the option of writing a script to perform additional actions and then submit one or more new tasks, based on the output of the monitoring script. Thus, the user can create applications which automatically detect and fix problems with individual tasks.

To do this, the user must specify monitor, an update_interval, and (optionally) a relauncher in the Processor constructor.

 
from stratos import Processor

job = Processor(master = "foam:5050", 
                monitor = "/path/to/monitor", 
                relauncher = "/path/to/relauncher",
                update_interval = time_interval_in_seconds
                job_id = "demo")

For each task running on the cluster, the monitor is called every update_interval seconds. For tasks that are expected to take hours to complete, the user may choose to set the update interval to a few minutes (e.g., update_interval = 300), while short-running tasks may be checked every second or two.

The monitor is provided with two command line arguments:

  1. the process id number of the task on the host node (pid).
  2. a string containing the standard output and standard error of the task. Only the output produced since the previous time that the monitor was called is provided (task_output).
So, the signature is:

     monitor pid task_output

The task_output string contains the standard output and standard error, wrapped in identifying tags:

     task_output = '<stdout>standard output</stdout><stderr>standard error</stderr>'

The monitor can then make use of this data to perform tests. For example, the PID can be used to access the /proc/pid virtual directory, which (on Linux) contains information about the process. Of particular interest are the files:

  • /proc/pid/cmdline
  • /proc/pid/stat
  • /proc/pid/statm
  • /proc/pid/status
  • /proc/pid/io
If the memory usage or CPU usage is found to be unusual or the output streams contain keywords than indicate a problem with the task, the monitor can write "stratos_relaunch" to its standard output stream. For example, if the monitoring code is written as a shell script,

#!/bin/bash

pid=$1
output=$2

# do some analysis here, using $pid and $output  

# if the analysis reveals that the task needs to be stopped:

echo "stratos_relaunch"

# if you want to provide additional information:

echo "additional information goes here"

When the framework detects "stratos_relaunch" in the monitor's output stream, it terminates the task. If a relauncher is specified, then the relauncher is then executed:

    relauncher output_from_monitor

where output_from_monitor is a string containing all of the output generated by the monitor, except for the leading "stratos_relaunch" keyword. The relauncher uses the information contained in this string to build a new command or list of new commands that will be submitted to the scheduler. These commands are written to the relauncher's standard output stream, one command per line (i.e., delimited by newline characters). If your relauncher is written in C++, it would look something like this:

#include <iostream>

int main(int argc, char* argv[])
{
   if (argc != 2)
   {
      return 1;
   }
  
   // read argv[1] here and create new commands

   std::cout << "first command\n";
   std::cout << "second command\n";
   std::cout << "third command\n";

   return 0;
}

StratOS then sends these new tasks to the scheduler. This feature works in batch mode as well as streaming mode.

Full Processor Details

The constructor for the Processor class accepts the following parameters (default values shown)
Processor:(master,
           role="*",
           threads_per_task="2",
           mem_per_task="1024",
           disk_per_task="2048",
           monitor="",
           relauncher="",
           update_interval=10,
           output="scheduler",
           log_directory="",
           job_id="untitled",
           settings=None)

Where,

master
the hostname:port of the Mesos master
role
specifies the Mesos "role" that the job is assigned. Roles are Mesos' way of limiting the resources for various types of tasks that are run on the cluster or data center. Roles are defined by the system administrator.
threads_per_task
the number of threads allocated to each task
mem_per_task
the amount of system memory (in MB) allocated to each task
disk_per_task
the amount of disk space (in MB) allocated to each task
monitor
the full file name of the monitoring program
relauncher
the full file name of the relauncher program
update_interval
the time interval (in seconds) between calls to the monitoring program
output
The destination of each task's standard output. Options are:
  • scheduler: stdout is sent back to the scheduler.
  • disk: stdout is saved to a file in the log_directory.
  • mesos: stdout is logged by mesos (it can be accessed via the Mesos web interface).
  • none: stdout is discarded.
log_directory
specifies the directory in which to store the standard output of each task if output="disk"
job_id
specifies the name of the job. This allows the user to identify the job in the Mesos web interface.
settings
an object which stores all of the settings.

Each of the following methods exists as a pair—one for batch jobs and one for streaming jobs.

glob_batch(command, pattern)
glob_stream(command, pattern)

Performs a command on a collection of files whose names match a particular pattern. This is useful for running the same command on all files in a particular directory. The command is parameterized with the filename placeholder %f%.

command is a string containing the path to an executable, command flags, and a filename indicated with %f%.

pattern is a string containing a pattern recognizable by the python glob.glob() function.

template_batch(command_template, params)
template_stream(command_template, params)

Performs commands constructed from a template. This is useful for running a set of commands which differ in their arguments. For example, if we wanted to run the same analysis on several snapshots from various directions, we could do this:

        template_batch("program -direction %c% %c%", params)

where

           params = ["0.3 1.1 0.67, file1",     OR    params  = [["0.3 1.1 0.67", "file1"],
                     "0.2 0.9 0.7, file1",                       ["0.2 0.9 0.7", "file1"],
                     "0.3 1.1 0.67, file2",                      ["0.3 1.1 0.67", "file2"],
                     "0.5 1.5 0.47, file2"]                      ["0.5 1.5 0.47", "file2"]]

command_template A string containing the general pattern of the command with placeholders, %c%, where the parameters from the params variable will be substituted.

params params: A list of strings or a list containing lists of strings:

["string", "string", "string"]   OR   [["string"], ["string"], ["string"]]

If the first form is used, commas act as delimiters. Thus,

["s1, s2, s3"]

is converted to

[["s1", "s2", "s3"]]

and

["s1, s2, s3", "s4, s5, s6"]

becomes

[["s1", "s2", "s3"], ["s4", "s5", "s6"]]

A concrete example is provided, here.

explicit_batch(commands)
explicit_stream(commands)

These methods launch any set of arbitrary commands. The previously-introduced command submission methods are simply convenience wrappers around these two core methods.

commands is a string of commands, delimited by newline characters or semicolons.

wait()

Blocks the thread until all submitted tasks have completed. Returns True on success. Reurns False if there is no stream on which to wait.

Stream methods

The stream object, which is created when one of the Processor's *stream() methods is invoked, has the following methods. Most of these should be self-explanatory:
add()                     add a new command / task
closed()                  True if the stream is open; False if wait() has already been called
count_completed_tasks()
count_lost_tasks()
count_running_tasks()
count_total_tasks()
count_unfinished_tasks()
get_command()
get_results()
get_status()
list_completed_tasks()
list_lost_tasks()
list_running_tasks()
list_unfinished_tasks()
wait()                   The same as the wait() described above. 

Streaming vs. Batch Mode

The reader may have noticed that using a batch command, such as glob_batch(), is similar to calling glob_stream(), followed immediately by wait(). The difference between the two is that there is less overhead involved in using a batch command because no stream object needs to be created. The batch command also tends to return control to the parent thread more quickly than the stream command. In most cases, it is better to use the streaming mode of operation, due to the additional features that it offers. Batch mode is available for certain cases in which latency is important.

Template Example:

Suppose you wish to use StratOS as a batch processor to render frames of an animation of a galaxy model rotating about some point. Suppose you can use a program called galaxy_view to render the individual frames of the animation. To use galaxy_view, you have to provide a settings file (config), an input file, a frame number, and a viewing direction and orientation, which requires at least four numbers:

  • 1–3: The position of the virtual camera with respect to the center of the point of interest. Suppose this is given in spherical coordinates (r, θ, φ)
  • 4: The orientation of the camera, α (to specify which direction is up).
The program might be invoked like this:

 galaxy_view config distance theta phi alpha model_file frame_number
In order to animate a rotation, you can view the model from different directions, by varying phi.
import math
import numpy as np
from dysota import Processor

number_of_frames = 1200

twopi = 2 * math.pi

dalpha = twopi  / number_of_frames

model = "/path/to/galaxy/model/in/HDFS"

config = "path/to/config/file/in/HDFS"

alphas = np.arange(0, twopi, dalpha)

params = [[str(alphas[i]), str(i)] for i in range(alphas.size)]

job = Processor(master="foam:5050", threads_per_task=8, 
                job_id="animation")

job.template_stream("galaxy_view " + config
                    + " 150 0 %c% 0 " 
                    + model + " %c%", params)

The distance, theta, and alpha have been explicitly written in the command string (150, 0, and 0).

5 Responses to “StratOS Usage”

  1. ?ssayontim? Says:

    Good tutorial and online source for getting code help.Thanks for sharing this with us. I hope all users will use this site for learning and using for development fix.

  2. Cyber Security Says:

    A well planned program can do wonder only if we want. There are different coding program but this one is best one because of it's so useful for us. .

  3. EdMurr Says:

    Great tutorial, thank you for sharing your tips.

  4. html color Says:

    Thank you for sharing the codes! I'm looking for it to complete the html color codes

  5. Check Cashing Says:

    Just to find that your check couldn't be gotten the money for when you achieve the teller. Today, getting the money for checks just requires a couple of minutes of self-benefit from intuitive registration booths.

Leave a Reply