StratOS is the working title of the Big Data framework that I have been developing while working as an Assistant Project Scientist at UC Riverside. Here’s a draft of the documentation for the basic usage of the framework. It consists of some basic examples and documentation of each function call that is made available to the user. I’m keeping the documentation here until I have a better place (perhaps on astro.ucr.edu). The motivation for the project and architectural details will be provided in a paper.
Basic Usage: Batch Processing
Currently, the StratOS interface is only available through the Python module, ‘stratos.’ The most basic use-case involves using StratOS as a batch processor. For example, to run the command, gsnap -nt 01111
, on a set of files:
from stratos import Processor
job = Processor(master="foam:5050", threads_per_task=2, job_id="demo")
job.glob_batch("gsnap -nt 01111 %f%", "/dfs/snapshots/*_2*")
results = job.get_results()
The import statement loads the StratOS Processor. The Processor class constructor can be provided with several arguments which modify the behavior of the framework. Only the master
, which specifies the address and port number of the Mesos master, is absolutely required. In this example, I have also specified that each task will be allocated two threads (threads_per_task=2
) and that the name of the job is “demo” (job_id="demo"
).
The Processor object is then used to launch a batch job. The job consists of running the command gsnap -nt 01111
on a set of snapshot files located in the HDFS, which is mounted at /dfs/:
job.glob_batch("gsnap -nt 01111 %f%", "/dfs/snapshots/*_2*")
The wildcards in the filename in the second argument of glob_batch()
are expanded, so that /dfs/snapshots/*_2* matches all files in the snapshot directory that contain the pattern, *_2*. In this particular snapshot directory, the files all begin with “snapshot_” and end in a three-digit number, so *_2* matches snapshot_200, snapshot_201, and so on. The symbol %f%
in the first argument of the glob_batch()
method is a file name place-holder, which is then replaced with the expanded file names. Thus, the function call, shown above, launches the following commands on the cluster or data center:
gsnap -nt 01111 <[/dfs/snapshots/snapshot_200]>
gsnap -nt 01111 <[/dfs/snapshots/snapshot_201]>
gsnap -nt 01111 <[/dfs/snapshots/snapshot_202]>
⋮
gsnap -nt 01111 <[/dfs/snapshots/snapshot_299]>
Where gsnap
is a program that is installed on each machine in the cluster / data center. To launch a program that is not installed, the executable would have to be placed in a network file system (e.g., NFS or HDFS) that is accessible by all of the nodes in the cluster.
The brackets, <[ ]>, indicate that the enclosed file is stored on the HDFS. The framework automatically identifies which machines contain local copies of the data stored in each file. When these brackets are present, tasks are assigned to nodes that contain relevant data whenever possible. This minimizes network traffic and improves the data read speed (Note: In more advanced use-cases, several file names can be used in a single command; tasks are launched on machines that contain the largest fraction of the data stored in the set of files).
When using the glob_batch()
method (or any *_batch
method) the current thread in the Python interpreter waits for the launched tasks to complete (i.e., the thread is blocked). By default, the standard output from each task is sent back to the scheduler. This output is accessed by calling the get_results()
method.
Continuing the example, the results can be further analyzed in Python quite easily. The output of the gsnap
command is a tab-delimited list of numbers. The second number in the output list is the velocity dispersion of the stars in the snapshot. Those velocity dispersions can be extracted and further analyzed. For instance,
import numpy as np
from scipy import stats
vel_disp = np.array([float(res.split('\t')[1]) for res in results])
minimum = vel_disp.min()
mean = vel_disp.mean()
maximum = vel_disp.max()
standard_deviation = vel_disp.std()
median = np.median(vel_disp)
mode = stats.mode(vel_disp)
kurtosis = stats.kurtosis(vel_disp)
skewness = stats.skew(vel_disp)
fifthmoment = stats.moment(a, 5)
On line 11, a list comprehension is used to extract the second number from each output string. These numbers are stored in a Python list as floating point values. The list is then used to construct a NumPy array, which can easily be analyzed, plotted, or used as input data in a larger analysis scheme.
It should be noted that the program, gsnap, did not need to be modified in order to use it with StratOS. StratOS is able to run any program that can be operated using command line arguments (i.e., any program that could be controlled by a Unix shell script). Also, note that it is possible to save the standard output of each task to the file system or discard it entirely, rather than sending the standard output back to the scheduler.
Streaming Mode
The batch mode of operation is useful in some situations, but it limits the user because, once the commands are submitted to the cluster, nothing can be done until all of the tasks have finished executing. In streaming mode, the framework does not block the thread after a set of commands are launched. More commands can be added to the job before the previously-submitted tasks have finished executing. In streaming mode, the user can also check the status of individual tasks, as well as the status of the job as a whole. Modifying the example above by replacing “batch” with “stream,” we have the following:
from stratos import Processor
job = Processor(master="foam:5050", threads_per_task=2, job_id="demo")
job.glob_stream("gsnap -nt 01111 %f%", "/dfs/snapshots/*_2*")
job.glob_stream("gsnap -nt 01111 %f%", "/dfs/snapshots/*_4*")
results = job.get_results()
The second glob_stream()
function call is performed while the tasks specified in the first call are already beginning to run on the cluster; it simply adds more tasks to the list of tasks that are being scheduled for execution. Depending on data placement and node availability, some tasks specified in the second call may execute before tasks from the first call; StratOS will try to schedule tasks so that the entire job finishes as quickly as possible. Calling get_results()
in streaming mode returns only the results that have arrived since the previous call to get_results()
. For example, calling get_results()
one minute after submitting the tasks to the scheduler might return 30 results. Waiting another 20 seconds and calling get_results()
might return 4 new results. To collect all results in a list, the user could append the results to the list of previously-received results, like so:
results += job.get_results()
The fact that the processor does not block the thread in streaming mode also allows the user to run multiple jobs simultaneously from the same script. For instance, the user might launch one job called map
and another named reduce
:
from stratos import Processor
map = Processor(master="foam:5050", job_id="map")
reduce = Processor(master="foam:5050", job_id="red")
map.glob_stream("mapper %f% /dfs/map_results", "/dfs/raw_data/*")
reduce.glob_stream("reducer %f%", "/dfs/map_results/*")
results = reduce.get_results()
This is not a complete example of an implementation of MapReduce in StratOS, but it is a hint.
To inspect the status of each job, programatically, the Processor’s stream object is accessed directly:
from stratos import Processor
job = Processor(master="foam:5050", threads_per_task=2, job_id="demo")
job.glob_stream("gsnap -nt 01111 %f%", "/dfs/snapshots/*")
s = job.stream
number_of_tasks_currently_running = s.count_running_tasks()
list_of_tasks_currently_running = s.list_running_tasks()
number_of_unfinished_tasks = s.count_unfinished_tasks()
total_number_of_tasks_submitted = s.count_total_tasks()
command_associated_with_task_423 = s.get_command(423)
status_of_task_357 = s.get_status(357)
If, at some point, you wish to cause the stream to block the current thread, use the wait()
method. Continuing the previous example,
command_associated_with_task_423 = s.get_command(423)
status_of_task_357 = s.get_status(357)
job.wait() # Returns True when all tasks have completed
print ("this line is executed after all tasks have completed")
Task Monitoring and Relaunching
StratOS allows the user to write scripts or programs that monitor the standard output and error streams as well as the resource usage of each task. If certain user-defined conditions are met, the monitoring script can terminate the task. The user also has the option of writing a script to perform additional actions and then submit one or more new tasks, based on the output of the monitoring script. Thus, the user can create applications which automatically detect and fix problems with individual tasks.
To do this, the user must specify monitor
, an update_interval
, and (optionally) a relauncher
in the Processor constructor.
from stratos import Processor
job = Processor(master = "foam:5050",
monitor = "/path/to/monitor",
relauncher = "/path/to/relauncher",
update_interval = time_interval_in_seconds
job_id = "demo")
For each task running on the cluster, the monitor
is called every update_interval
seconds. For tasks that are expected to take hours to complete, the user may choose to set the update interval to a few minutes (e.g., update_interval = 300
), while short-running tasks may be checked every second or two.
The monitor is provided with two command line arguments:
- the process id number of the task on the host node (pid).
- a string containing the standard output and standard error of the task. Only the output produced since the previous time that the monitor was called is provided (task_output).
So, the signature is:
monitor pid task_output
The task_output
string contains the standard output and standard error, wrapped in identifying tags:
task_output = '<stdout>standard output</stdout><stderr>standard error</stderr>'
The monitor can then make use of this data to perform tests. For example, the PID can be used to access the /proc/pid virtual directory, which (on Linux) contains information about the process. Of particular interest are the files:
- /proc/pid/cmdline
- /proc/pid/stat
- /proc/pid/statm
- /proc/pid/status
- /proc/pid/io
If the memory usage or CPU usage is found to be unusual or the output streams contain keywords than indicate a problem with the task, the monitor can write “stratos_relaunch” to its standard output stream. For example, if the monitoring code is written as a shell script,
#!/bin/bash
pid=$1
output=$2
# do some analysis here, using $pid and $output
# if the analysis reveals that the task needs to be stopped:
echo "stratos_relaunch"
# if you want to provide additional information:
echo "additional information goes here"
When the framework detects “stratos_relaunch” in the monitor’s output stream, it terminates the task. If a relauncher
is specified, then the relauncher is then executed:
relauncher output_from_monitor
where output_from_monitor
is a string containing all of the output generated by the monitor, except for the leading “stratos_relaunch” keyword. The relauncher uses the information contained in this string to build a new command or list of new commands that will be submitted to the scheduler. These commands are written to the relauncher’s standard output stream, one command per line (i.e., delimited by newline characters). If your relauncher is written in C++, it would look something like this:
#include <iostream>
int main(int argc, char* argv[])
{
if (argc != 2)
{
return 1;
}
// read argv[1] here and create new commands
std::cout << "first command\n";
std::cout << "second command\n";
std::cout << "third command\n";
return 0;
}
StratOS then sends these new tasks to the scheduler. This feature works in batch mode as well as streaming mode.
Full Processor Details
The constructor for the Processor class accepts the following parameters (default values shown)
Processor:(master,
role="*",
threads_per_task="2",
mem_per_task="1024",
disk_per_task="2048",
monitor="",
relauncher="",
update_interval=10,
output="scheduler",
log_directory="",
job_id="untitled",
settings=None)
Where,
master
- the hostname:port of the Mesos master
role
- specifies the Mesos “role” that the job is assigned. Roles are Mesos’ way of limiting the resources for various types of tasks that are run on the cluster or data center. Roles are defined by the system administrator.
threads_per_task
- the number of threads allocated to each task
mem_per_task
- the amount of system memory (in MB) allocated to each task
disk_per_task
- the amount of disk space (in MB) allocated to each task
monitor
- the full file name of the monitoring program
relauncher
- the full file name of the relauncher program
update_interval
- the time interval (in seconds) between calls to the monitoring program
output
- The destination of each task’s standard output. Options are:
scheduler
: stdout is sent back to the scheduler.
disk
: stdout is saved to a file in the log_directory.
mesos
: stdout is logged by mesos (it can be accessed via the Mesos web interface).
none
: stdout is discarded.
log_directory
- specifies the directory in which to store the standard output of each task if
output="disk"
job_id
- specifies the name of the job. This allows the user to identify the job in the Mesos web interface.
settings
- an object which stores all of the settings.
Each of the following methods exists as a pair—one for batch jobs and one for streaming jobs.
glob_batch(command, pattern)
glob_stream(command, pattern)
Performs a command on a collection of files whose names match a particular pattern.
This is useful for running the same command on all files in a particular directory.
The command is parameterized with the filename placeholder %f%
.
command
is a string containing the path to an executable, command flags, and a filename indicated with %f%
.
pattern
is a string containing a pattern recognizable by the python glob.glob() function.
template_batch(command_template, params)
template_stream(command_template, params)
Performs commands constructed from a template. This is useful for running a set of
commands which differ in their arguments. For example, if we wanted to run the same
analysis on several snapshots from various directions, we could do this:
template_batch("program -direction %c% %c%", params)
where
params = ["0.3 1.1 0.67, file1", OR params = [["0.3 1.1 0.67", "file1"],
"0.2 0.9 0.7, file1", ["0.2 0.9 0.7", "file1"],
"0.3 1.1 0.67, file2", ["0.3 1.1 0.67", "file2"],
"0.5 1.5 0.47, file2"] ["0.5 1.5 0.47", "file2"]]
command_template
A string containing the general pattern of the command
with placeholders, %c%
, where the parameters from the params variable will be
substituted.
params
params: A list of strings or a list containing lists of strings:
["string", "string", "string"] OR [["string"], ["string"], ["string"]]
If the first form is used, commas act as delimiters. Thus,
["s1, s2, s3"]
is converted to
[["s1", "s2", "s3"]]
and
["s1, s2, s3", "s4, s5, s6"]
becomes
[["s1", "s2", "s3"], ["s4", "s5", "s6"]]
A concrete example is provided, here.
explicit_batch(commands)
explicit_stream(commands)
These methods launch any set of arbitrary commands. The previously-introduced command submission methods are simply convenience wrappers around these two core methods.
commands
is a string of commands, delimited by newline characters or semicolons.
wait()
Blocks the thread until all submitted tasks have completed. Returns True
on success. Reurns False
if there is no stream on which to wait.
Stream methods
The stream object, which is created when one of the Processor’s
*stream()
methods is invoked, has the following methods. Most of these should be self-explanatory:
add() add a new command / task
closed() True if the stream is open; False if wait() has already been called
count_completed_tasks()
count_lost_tasks()
count_running_tasks()
count_total_tasks()
count_unfinished_tasks()
get_command()
get_results()
get_status()
list_completed_tasks()
list_lost_tasks()
list_running_tasks()
list_unfinished_tasks()
wait() The same as the wait() described above.
Streaming vs. Batch Mode
The reader may have noticed that using a batch command, such as glob_batch()
, is similar to calling glob_stream()
, followed immediately by wait()
. The difference between the two is that there is less overhead involved in using a batch command because no stream object needs to be created. The batch command also tends to return control to the parent thread more quickly than the stream command. In most cases, it is better to use the streaming mode of operation, due to the additional features that it offers. Batch mode is available for certain cases in which latency is important.
Template Example:
Suppose you wish to use StratOS as a batch processor to render frames of an animation of a galaxy model rotating about some point. Suppose you can use a program called galaxy_view to render the individual frames of the animation. To use galaxy_view, you have to provide a settings file (config
), an input file, a frame number, and a viewing direction and orientation, which requires at least four numbers:
- 1–3: The position of the virtual camera with respect to the center of the point of interest. Suppose this is given in spherical coordinates (r, θ, φ)
- 4: The orientation of the camera, α (to specify which direction is up).
The program might be invoked like this:
galaxy_view config distance theta phi alpha model_file frame_number
In order to animate a rotation, you can view the model from different directions, by varying
phi
.
import math
import numpy as np
from dysota import Processor
number_of_frames = 1200
twopi = 2 * math.pi
dalpha = twopi / number_of_frames
model = "/path/to/galaxy/model/in/HDFS"
config = "path/to/config/file/in/HDFS"
alphas = np.arange(0, twopi, dalpha)
params = [[str(alphas[i]), str(i)] for i in range(alphas.size)]
job = Processor(master="foam:5050", threads_per_task=8,
job_id="animation")
job.template_stream("galaxy_view " + config
+ " 150 0 %c% 0 "
+ model + " %c%", params)
The distance, theta, and alpha have been explicitly written in the command string (150, 0, and 0).