Wormhole

May 28th, 2015

While developing tools for StratOS, I wrote a nifty little program that I call “Wormhole,” which uses the Linux kernel’s inotify functionality to respond to filesystem events. More specifically, it watches a single directory and performs specified actions on any files that are moved into or written to the directory. Once the files are processed, the results are stored in another directory. The syntax is like this:

$ wormhole action entrance_directory exit_directory
where the action is either a program name or one of the built-in commands: --zip or --unzip. The program that is used as the action, must take two input arguments: an input file and an output directory. Wormhole expects the action to remove the input file from the entrance_directory as its final step. The simplest (very silly) example is:
$ wormhole mv /directory1/  /directory2/
In this example, the action is the move command, mv. Whenever a file is added to /directory1, it will automatically be moved to /directory2.

A more realistic use case would involve a script that performs some sort of useful operations on the files that enter the entrance_directory. Here’s a slightly less silly example action:

#! /bin/bash

# pdf_append.sh - A command for adding new pages to a PDF file
# usage: pdf_append.sh input_image output_directory
# where input_image is an image file that can be processed with
# ImageMagick's convert command and output_directory contains
# exactly one PDF document.
# Note that this is a very fragile script for demonstration
# purposes only.

input_image=$1
output_dir=$2

# convert image to PDF 
# (this assumes the input isn't already a PDF)

input_pdf="$input_image.pdf"
convert $input_image $input_pdf

# get the name of the PDF being appended to.
# assumes there is exactly one PDF file in output_dir 

output_pdf="$(ls $output_dir/*.pdf)"

temp_pdf="${output_pdf/.pdf/-temp.pdf}"

# append the input_pdf to the end of the output_pdf

mv $output_pdf $temp_pdf

pdfunite $temp_pdf $input_pdf $output_pdf
# these tools would also work: 
# pdftk $temp_pdf $input_pdf cat output $output_pdf 
# pdfjoin $temp_pdf $input_pdf $output_pdf

# delete intermediate files:

rm $input_image $input_pdf $temp_pdf

Suppose I create directories in my current working direcotry, called in/ and out/. Inside of the out/ directory, I place a PDF document. I then execute wormhole, like this:
$ wormhole ./pdf_append.sh in out 
Now, if I save or move images into the in/ directory, they will automatically be appended to the PDF document in out/ as new pages.

Wormhole was initially developed to provide a means of automatically compressing and decompressing files before they are sent over a non-compressed network stream (specifically, NFS), so it has two built-in actions, --zip and --unzip. These actions currently use zlib to perform the compression, but they will eventually use the LZO compression library, due to its compression speed. The input and output directories are in RAM-backed, tmpfs file systems in order to minimize latency. The sender places files into one “mouth” of the wormhole and files are compressed and sent to the destination computer. The receiver watches the other mouth of the worm hole and decompresses files as they arrive.

The current version of Wormhole isn’t written to be a proper daemon, but it can be cheaply “daemonized” using nohup and output redirection. For example:

$ nohup wormhole --zip /path/to/in /path/to/out 1> /dev/null 2>/dev/null &
The source for the version of Wormhole described above can be found here.

StratOS Usage

January 6th, 2015

StratOS is the working title of the Big Data framework that I have been developing while working as an Assistant Project Scientist at UC Riverside. Here’s a draft of the documentation for the basic usage of the framework. It consists of some basic examples and documentation of each function call that is made available to the user. I’m keeping the documentation here until I have a better place (perhaps on astro.ucr.edu). The motivation for the project and architectural details will be provided in a paper.

Basic Usage: Batch Processing

Currently, the StratOS interface is only available through the Python module, ‘stratos.’ The most basic use-case involves using StratOS as a batch processor. For example, to run the command, gsnap -nt 01111, on a set of files:

 
from stratos import Processor

job = Processor(master="foam:5050", threads_per_task=2, job_id="demo")

job.glob_batch("gsnap -nt 01111 %f%", "/dfs/snapshots/*_2*")

results = job.get_results()

The import statement loads the StratOS Processor. The Processor class constructor can be provided with several arguments which modify the behavior of the framework. Only the master, which specifies the address and port number of the Mesos master, is absolutely required. In this example, I have also specified that each task will be allocated two threads (threads_per_task=2) and that the name of the job is “demo” (job_id="demo").

The Processor object is then used to launch a batch job. The job consists of running the command gsnap -nt 01111 on a set of snapshot files located in the HDFS, which is mounted at /dfs/:

    job.glob_batch("gsnap -nt 01111 %f%", "/dfs/snapshots/*_2*")

The wildcards in the filename in the second argument of glob_batch() are expanded, so that /dfs/snapshots/*_2* matches all files in the snapshot directory that contain the pattern, *_2*. In this particular snapshot directory, the files all begin with “snapshot_” and end in a three-digit number, so *_2* matches snapshot_200, snapshot_201, and so on. The symbol %f% in the first argument of the glob_batch() method is a file name place-holder, which is then replaced with the expanded file names. Thus, the function call, shown above, launches the following commands on the cluster or data center:

    gsnap -nt 01111 <[/dfs/snapshots/snapshot_200]>
    gsnap -nt 01111 <[/dfs/snapshots/snapshot_201]>
    gsnap -nt 01111 <[/dfs/snapshots/snapshot_202]>
      ⋮    
    gsnap -nt 01111 <[/dfs/snapshots/snapshot_299]>

Where gsnap is a program that is installed on each machine in the cluster / data center. To launch a program that is not installed, the executable would have to be placed in a network file system (e.g., NFS or HDFS) that is accessible by all of the nodes in the cluster.

The brackets, <[ ]>, indicate that the enclosed file is stored on the HDFS. The framework automatically identifies which machines contain local copies of the data stored in each file. When these brackets are present, tasks are assigned to nodes that contain relevant data whenever possible. This minimizes network traffic and improves the data read speed (Note: In more advanced use-cases, several file names can be used in a single command; tasks are launched on machines that contain the largest fraction of the data stored in the set of files).

When using the glob_batch() method (or any *_batch method) the current thread in the Python interpreter waits for the launched tasks to complete (i.e., the thread is blocked). By default, the standard output from each task is sent back to the scheduler. This output is accessed by calling the get_results() method.

Continuing the example, the results can be further analyzed in Python quite easily. The output of the gsnap command is a tab-delimited list of numbers. The second number in the output list is the velocity dispersion of the stars in the snapshot. Those velocity dispersions can be extracted and further analyzed. For instance,

import numpy as np
from scipy import stats

vel_disp = np.array([float(res.split('\t')[1]) for res in results])

minimum = vel_disp.min()

mean = vel_disp.mean()

maximum = vel_disp.max()

standard_deviation = vel_disp.std()

median = np.median(vel_disp)

mode = stats.mode(vel_disp)

kurtosis = stats.kurtosis(vel_disp)

skewness = stats.skew(vel_disp)

fifthmoment = stats.moment(a, 5)

On line 11, a list comprehension is used to extract the second number from each output string. These numbers are stored in a Python list as floating point values. The list is then used to construct a NumPy array, which can easily be analyzed, plotted, or used as input data in a larger analysis scheme.

It should be noted that the program, gsnap, did not need to be modified in order to use it with StratOS. StratOS is able to run any program that can be operated using command line arguments (i.e., any program that could be controlled by a Unix shell script). Also, note that it is possible to save the standard output of each task to the file system or discard it entirely, rather than sending the standard output back to the scheduler.

Streaming Mode

The batch mode of operation is useful in some situations, but it limits the user because, once the commands are submitted to the cluster, nothing can be done until all of the tasks have finished executing. In streaming mode, the framework does not block the thread after a set of commands are launched. More commands can be added to the job before the previously-submitted tasks have finished executing. In streaming mode, the user can also check the status of individual tasks, as well as the status of the job as a whole. Modifying the example above by replacing “batch” with “stream,” we have the following:

 
from stratos import Processor

job = Processor(master="foam:5050", threads_per_task=2, job_id="demo")

job.glob_stream("gsnap -nt 01111 %f%", "/dfs/snapshots/*_2*")

job.glob_stream("gsnap -nt 01111 %f%", "/dfs/snapshots/*_4*")

results = job.get_results()

The second glob_stream() function call is performed while the tasks specified in the first call are already beginning to run on the cluster; it simply adds more tasks to the list of tasks that are being scheduled for execution. Depending on data placement and node availability, some tasks specified in the second call may execute before tasks from the first call; StratOS will try to schedule tasks so that the entire job finishes as quickly as possible. Calling get_results() in streaming mode returns only the results that have arrived since the previous call to get_results(). For example, calling get_results() one minute after submitting the tasks to the scheduler might return 30 results. Waiting another 20 seconds and calling get_results() might return 4 new results. To collect all results in a list, the user could append the results to the list of previously-received results, like so:

 
results += job.get_results()

The fact that the processor does not block the thread in streaming mode also allows the user to run multiple jobs simultaneously from the same script. For instance, the user might launch one job called map and another named reduce:

from stratos import Processor

map = Processor(master="foam:5050", job_id="map")

reduce = Processor(master="foam:5050", job_id="red")

map.glob_stream("mapper %f% /dfs/map_results", "/dfs/raw_data/*")

reduce.glob_stream("reducer %f%", "/dfs/map_results/*")

results = reduce.get_results()

This is not a complete example of an implementation of MapReduce in StratOS, but it is a hint.

To inspect the status of each job, programatically, the Processor’s stream object is accessed directly:

 
from stratos import Processor

job = Processor(master="foam:5050", threads_per_task=2, job_id="demo")

job.glob_stream("gsnap -nt 01111 %f%", "/dfs/snapshots/*")

s = job.stream

number_of_tasks_currently_running = s.count_running_tasks()

list_of_tasks_currently_running = s.list_running_tasks()

number_of_unfinished_tasks = s.count_unfinished_tasks()

total_number_of_tasks_submitted = s.count_total_tasks()

command_associated_with_task_423 = s.get_command(423)

status_of_task_357 = s.get_status(357)

If, at some point, you wish to cause the stream to block the current thread, use the wait() method. Continuing the previous example,

command_associated_with_task_423 = s.get_command(423)

status_of_task_357 = s.get_status(357)

job.wait() # Returns True when all tasks have completed

print ("this line is executed after all tasks have completed")

Task Monitoring and Relaunching

StratOS allows the user to write scripts or programs that monitor the standard output and error streams as well as the resource usage of each task. If certain user-defined conditions are met, the monitoring script can terminate the task. The user also has the option of writing a script to perform additional actions and then submit one or more new tasks, based on the output of the monitoring script. Thus, the user can create applications which automatically detect and fix problems with individual tasks.

To do this, the user must specify monitor, an update_interval, and (optionally) a relauncher in the Processor constructor.

 
from stratos import Processor

job = Processor(master = "foam:5050", 
                monitor = "/path/to/monitor", 
                relauncher = "/path/to/relauncher",
                update_interval = time_interval_in_seconds
                job_id = "demo")

For each task running on the cluster, the monitor is called every update_interval seconds. For tasks that are expected to take hours to complete, the user may choose to set the update interval to a few minutes (e.g., update_interval = 300), while short-running tasks may be checked every second or two.

The monitor is provided with two command line arguments:

  1. the process id number of the task on the host node (pid).
  2. a string containing the standard output and standard error of the task. Only the output produced since the previous time that the monitor was called is provided (task_output).
So, the signature is:

     monitor pid task_output

The task_output string contains the standard output and standard error, wrapped in identifying tags:

     task_output = '<stdout>standard output</stdout><stderr>standard error</stderr>'

The monitor can then make use of this data to perform tests. For example, the PID can be used to access the /proc/pid virtual directory, which (on Linux) contains information about the process. Of particular interest are the files:

  • /proc/pid/cmdline
  • /proc/pid/stat
  • /proc/pid/statm
  • /proc/pid/status
  • /proc/pid/io
If the memory usage or CPU usage is found to be unusual or the output streams contain keywords than indicate a problem with the task, the monitor can write “stratos_relaunch” to its standard output stream. For example, if the monitoring code is written as a shell script,

#!/bin/bash

pid=$1
output=$2

# do some analysis here, using $pid and $output  

# if the analysis reveals that the task needs to be stopped:

echo "stratos_relaunch"

# if you want to provide additional information:

echo "additional information goes here"

When the framework detects “stratos_relaunch” in the monitor’s output stream, it terminates the task. If a relauncher is specified, then the relauncher is then executed:

    relauncher output_from_monitor

where output_from_monitor is a string containing all of the output generated by the monitor, except for the leading “stratos_relaunch” keyword. The relauncher uses the information contained in this string to build a new command or list of new commands that will be submitted to the scheduler. These commands are written to the relauncher’s standard output stream, one command per line (i.e., delimited by newline characters). If your relauncher is written in C++, it would look something like this:

#include <iostream>

int main(int argc, char* argv[])
{
   if (argc != 2)
   {
      return 1;
   }
  
   // read argv[1] here and create new commands

   std::cout << "first command\n";
   std::cout << "second command\n";
   std::cout << "third command\n";

   return 0;
}

StratOS then sends these new tasks to the scheduler. This feature works in batch mode as well as streaming mode.

Full Processor Details

The constructor for the Processor class accepts the following parameters (default values shown)
Processor:(master,
           role="*",
           threads_per_task="2",
           mem_per_task="1024",
           disk_per_task="2048",
           monitor="",
           relauncher="",
           update_interval=10,
           output="scheduler",
           log_directory="",
           job_id="untitled",
           settings=None)

Where,

master
the hostname:port of the Mesos master
role
specifies the Mesos “role” that the job is assigned. Roles are Mesos’ way of limiting the resources for various types of tasks that are run on the cluster or data center. Roles are defined by the system administrator.
threads_per_task
the number of threads allocated to each task
mem_per_task
the amount of system memory (in MB) allocated to each task
disk_per_task
the amount of disk space (in MB) allocated to each task
monitor
the full file name of the monitoring program
relauncher
the full file name of the relauncher program
update_interval
the time interval (in seconds) between calls to the monitoring program
output
The destination of each task’s standard output. Options are:
  • scheduler: stdout is sent back to the scheduler.
  • disk: stdout is saved to a file in the log_directory.
  • mesos: stdout is logged by mesos (it can be accessed via the Mesos web interface).
  • none: stdout is discarded.
log_directory
specifies the directory in which to store the standard output of each task if output="disk"
job_id
specifies the name of the job. This allows the user to identify the job in the Mesos web interface.
settings
an object which stores all of the settings.

Each of the following methods exists as a pair—one for batch jobs and one for streaming jobs.

glob_batch(command, pattern)
glob_stream(command, pattern)

Performs a command on a collection of files whose names match a particular pattern. This is useful for running the same command on all files in a particular directory. The command is parameterized with the filename placeholder %f%.

command is a string containing the path to an executable, command flags, and a filename indicated with %f%.

pattern is a string containing a pattern recognizable by the python glob.glob() function.

template_batch(command_template, params)
template_stream(command_template, params)

Performs commands constructed from a template. This is useful for running a set of commands which differ in their arguments. For example, if we wanted to run the same analysis on several snapshots from various directions, we could do this:

        template_batch("program -direction %c% %c%", params)

where

           params = ["0.3 1.1 0.67, file1",     OR    params  = [["0.3 1.1 0.67", "file1"],
                     "0.2 0.9 0.7, file1",                       ["0.2 0.9 0.7", "file1"],
                     "0.3 1.1 0.67, file2",                      ["0.3 1.1 0.67", "file2"],
                     "0.5 1.5 0.47, file2"]                      ["0.5 1.5 0.47", "file2"]]

command_template A string containing the general pattern of the command with placeholders, %c%, where the parameters from the params variable will be substituted.

params params: A list of strings or a list containing lists of strings:

["string", "string", "string"]   OR   [["string"], ["string"], ["string"]]

If the first form is used, commas act as delimiters. Thus,

["s1, s2, s3"]

is converted to

[["s1", "s2", "s3"]]

and

["s1, s2, s3", "s4, s5, s6"]

becomes

[["s1", "s2", "s3"], ["s4", "s5", "s6"]]

A concrete example is provided, here.

explicit_batch(commands)
explicit_stream(commands)

These methods launch any set of arbitrary commands. The previously-introduced command submission methods are simply convenience wrappers around these two core methods.

commands is a string of commands, delimited by newline characters or semicolons.

wait()

Blocks the thread until all submitted tasks have completed. Returns True on success. Reurns False if there is no stream on which to wait.

Stream methods

The stream object, which is created when one of the Processor’s *stream() methods is invoked, has the following methods. Most of these should be self-explanatory:
add()                     add a new command / task
closed()                  True if the stream is open; False if wait() has already been called
count_completed_tasks()
count_lost_tasks()
count_running_tasks()
count_total_tasks()
count_unfinished_tasks()
get_command()
get_results()
get_status()
list_completed_tasks()
list_lost_tasks()
list_running_tasks()
list_unfinished_tasks()
wait()                   The same as the wait() described above. 

Streaming vs. Batch Mode

The reader may have noticed that using a batch command, such as glob_batch(), is similar to calling glob_stream(), followed immediately by wait(). The difference between the two is that there is less overhead involved in using a batch command because no stream object needs to be created. The batch command also tends to return control to the parent thread more quickly than the stream command. In most cases, it is better to use the streaming mode of operation, due to the additional features that it offers. Batch mode is available for certain cases in which latency is important.

Template Example:

Suppose you wish to use StratOS as a batch processor to render frames of an animation of a galaxy model rotating about some point. Suppose you can use a program called galaxy_view to render the individual frames of the animation. To use galaxy_view, you have to provide a settings file (config), an input file, a frame number, and a viewing direction and orientation, which requires at least four numbers:

  • 1–3: The position of the virtual camera with respect to the center of the point of interest. Suppose this is given in spherical coordinates (r, θ, φ)
  • 4: The orientation of the camera, α (to specify which direction is up).
The program might be invoked like this:

 galaxy_view config distance theta phi alpha model_file frame_number
In order to animate a rotation, you can view the model from different directions, by varying phi.
import math
import numpy as np
from dysota import Processor

number_of_frames = 1200

twopi = 2 * math.pi

dalpha = twopi  / number_of_frames

model = "/path/to/galaxy/model/in/HDFS"

config = "path/to/config/file/in/HDFS"

alphas = np.arange(0, twopi, dalpha)

params = [[str(alphas[i]), str(i)] for i in range(alphas.size)]

job = Processor(master="foam:5050", threads_per_task=8, 
                job_id="animation")

job.template_stream("galaxy_view " + config
                    + " 150 0 %c% 0 " 
                    + model + " %c%", params)

The distance, theta, and alpha have been explicitly written in the command string (150, 0, and 0).

FUSE-DFS

August 14th, 2014

In order to allow existing software to access the Hadoop Distributed File System (HDFS) without modification, I have compiled and installed FUSE-DFS on my cluster. FUSE-DFS allows us to use FUSE (Files System in Userspace) to mount the HDFS as a local filesystem. Software can then access the contents of the HDFS in the same way that files on the local filesystem are accessed.

Since I am using the standard version of Hadoop (from hadoop.apache.org), rather than a distribution from Cloudera or another company, I had to compile and configure the filesystem myself. I ran into several issues along the way, so I thought that I should share my solution to some of the more difficult problems.

I began by reading a wiki page about Mountable HDFS. I had already downloaded the source for Hadoop 2.4.1, so I began attempting to compile the version of fuse_dfs that came included with the download. Upon trying to follow directions to compile fuse_dfs, I found that the directory structure in the instructions differed from the directory structure of the source taball that I downloaded. After spending some time attempting to adjust the instructions to apply to my source, I decided to compile the code manually. If I had more knowledge of cmake, I probably would have been able to use cmake to build it, but I don’t know very much about cmake yet.

The source for fuse_dfs was located at hadoop-2.4.1-src/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs. I created a build directory in hadoop-2.4.1-src/hadoop-hdfs-project/hadoop-hdfs/src/main/native/ and then compiled all of the source files with…

$ gcc ../fuse-dfs/*.c -o fuse_dfs -D_FILE_OFFSET_BITS=64 -I .. -I ../libhdfs/ \
-L /usr/local/hadoop/lib/native/ \
-Wl,-rpath=/usr/local/hadoop/lib/native/:/usr/lib/jvm/java-7-oracle/jre/lib/amd64/server/ \
-lhdfs -lfuse -lpthread -lc -lm

where /usr/local/hadoop/lib/native/ is the location of libhdfs.so and /usr/lib/jvm/java-7-oracle/jre/lib/amd64/server/ is the location of libjvm.so. You may also need to make a link to Hadoop’s “config.h” in the fuse-dfs directory or do something else so that the preprocessor can locate config.h.

When I first attempted this, the version of libhdfs.so installed on my system was apparently a 32-bit executable, so it could not be linked with fuse_dfs. I compiled libhdfs.so manually as well:

$ gcc -c -fPIC ../libhdfs/*.c -I /usr/lib/jvm/java-7-oracle/include/ \
-I /usr/lib/jvm/java-7-oracle/include/linux/ \
-I ../../../../../../hadoop-common-project/hadoop-common/target/native/

where the final include path specifies the location of config.h. I then linked it…

$ gcc -shared -fPIC -o libhdfs.so exception.o expect.o hdfs.o jni_helper.o \
-L /usr/lib/jvm/java-7-oracle/jre/lib/amd64/server/ -ljvm \
-Wl,-rpath=/usr/lib/jvm/java-7-oracle/jre/lib/amd64/server/

Once this was all finished, I installed fuse_dfs and fuse_dfs_wrapper.sh in /usr/local/hadoop/bin/ where all of the other hadoop-related executables are located. Upon trying to mount my HDFS, I encountered errors telling me that certain .jar files could not be found and that CLASSPATH was not defined. The command

$ hadoop classpath

prints the relevant CLASSPATH, but the CLASSPATH that is actually needed is an explicit listing of all of the .jar files—not just the list of directories (note that the system does not understand the meaning of the wildcard, *). In order to make the list of .jar files, I built a command with awk, sed, ls, and sh and then set the CLASSPATH environment variable to the result of that command. This can probably be done with a shorter command, but this works:

export CLASSPATH=$(hadoop classpath | sed s/:/'\n'/g | awk '/\*$/ {print "ls", $0 ".jar"}' | sh | sed ':a;N;$!ba;s/\n/:/g')

This command ignores one path—the path to Hadoop’s configuration .xml files, which is /usr/local/hadoop/etc/hadoop/, in my case. So I add this directory as follows:

export CLASSPATH=/usr/local/hadoop/etc/hadoop/:$CLASSPATH

This CLASSPATH definition is inserted into my .bashrc file on all of the nodes. At this point, I was still unable to mount the drive because I did not have the proper priviledges, so I added myself to the fuse group:

$ sudo adduser $USER fuse

Then, I had to uncomment the following line in /etc/fuse.conf:

user_allow_other

Finally, I was able to mount the filesystem:

$ fuse_dfs_wrapper.sh -d dfs://foam:8020 dfsmount/

Where “foam” is the hostname of the NameNode and dfsmount is the mountpoint. Here it is in action:

fuse_dfs

Research Cluster

July 28th, 2014

The first major task of my new position (Assistant Project Scientist) involved setting up a small cluster of GNU/Linux machines. These machines will be used for testing the ensemble analysis software framework that we will be developing. In this post, I’ll briefly describe the current hardware and software configuration of the cluster.

The cluster consists of eight worker nodes and one head node. The worker nodes are Supermicro MicroBlade servers (X9SCD-F). A 3.3 GHz quad-core Xeon and 32 GB of RAM are installed on each blade. The head node is temporarily a 16-core AMD Magny Cours system with 128 GB of RAM. I may write a separate post describing the permanent head node once the parts arrive and the machine assembled. The nine nodes communicate over a gigabit Ethernet network, based on a 24-port unmanaged Cisco Small Business 100 series switch.

The worker nodes and switch are currently on my desk:

IMG_20140723_161012

IMG_20140723_161027

IMG_20140723_161036

Settings and Software

Since the head node will also be used as a workstation, it is running the desktop version of Ubuntu 14.04.1. The worker nodes are running the server version of the same operating system.

The head node is set up as a network gateway so that all other computers on the local network can have Internet access. DHCP software is also installed on the head node so that other computers on the network (i.e. computers other than the cluster nodes) can be assigned IP addresses automatically. Instructions for setting up a gateway can be found here.

The file /etc/hosts for worker node-0 currently looks like this:
127.0.0.1       localhost
 
192.168.0.1   foam   foam.cosmo-cluster.net
192.168.0.100 node-0 node-0.cosmo-cluster.net
192.168.0.101 node-1 node-1.cosmo-cluster.net
192.168.0.102 node-2 node-2.cosmo-cluster.net
192.168.0.103 node-3 node-3.cosmo-cluster.net
192.168.0.104 node-4 node-4.cosmo-cluster.net
192.168.0.105 node-5 node-5.cosmo-cluster.net
192.168.0.106 node-6 node-6.cosmo-cluster.net
192.168.0.107 node-7 node-7.cosmo-cluster.net
 
# The following lines are desirable for IPv6 capable hosts
::1     localhost ip6-localhost ip6-loopback
ff02::1 ip6-allnodes
ff02::2 ip6-allrouters

where “foam” is the name of the head node.

The corresponding /etc/network/interfaces file looks like this:

# This file describes the network interfaces available on your system
# and how to activate them. For more information, see interfaces(5).
 
# The loopback network interface
auto lo
iface lo inet loopback
 
# The primary network interface
auto p1p1 
iface p1p1 inet static
       address 192.168.0.100
       netmask 255.255.255.0
       network 192.169.0.0
       broadcast 192.168.0.255
       gateway 192.168.0.1
       dns-nameservers 192.168.0.1
       dns-search cosmo-cluster.net

Passwordless SSH has been set up so that various programs can automatically log into machines. Currently, I’m just using the standard OpenSSH that comes with Ubuntu, but I’ll eventually install the HPN-SSH patched version.

For software administration (configuration management), I am using Ansible. I selected Ansible after reading about several alternative configuration management software packages. As far as I can tell, Ansible is the most elegant solution and it’s probably among the easiest to use.

In Ansible’s “hosts” file (/etc/ansible/hosts), I have included the following statement:

[nodes]
node-[0:7]

This allows me to address all of the worker nodes simultaneously, using the group label, “nodes.” For example, the following ad hoc command, displays the system time on all nodes by running the “date” command:

Selection_002

In addition to running ad hoc commands from the command line, it is possible to create rather intricate “playbooks” to automate many tasks easily.

Hadoop

I have installed Hadoop 2.4.1 and I am using Oracle Java 7. The hadoop software is installed in /usr/local/hadoop/. In my .bashrc file, I have the following:

# Set Hadoop-related environment variables
export HADOOP_HOME=/usr/local/hadoop
export JAVA_HOME=/usr/lib/jvm/java-7-oracle
export HADOOP_INSTALL=/usr/local/hadoop
export HADOOP_PREFIX=/usr/local/hadoop
export PATH=$PATH:$HADOOP_INSTALL/bin
export PATH=$PATH:$HADOOP_INSTALL/sbin
export HADOOP_MAPRED_HOME=$HADOOP_INSTALL
export HADOOP_COMMON_HOME=$HADOOP_INSTALL
export HADOOP_HDFS_HOME=$HADOOP_INSTALL
export YARN_HOME=$HADOOP_INSTALL
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_INSTALL/lib/native
export HADOOP_OPTS="-Djava.library.path=$HADOOP_INSTALL/lib"
unalias fs &> /dev/null
alias fs="hadoop fs"
unalias hls &> /dev/null
alias hls="fs -ls"
 
# Add Hadoop bin/ directory to PATH
export PATH=$PATH:$HADOOP_HOME/bin

The hadoop configuration files in /usr/local/hadoop/etc/hadoop/ look like this:

core-site.xml:
<configuration>
   <property>
   <name>fs.defaultFS</name>
   <value>hdfs://foam/</value>
   </property>
   <property>
   <name>io.file.buffer.size</name>
   <value>4096</value>
   </property>
   <property>
   <name>hadoop.tmp.dir</name>
   <value>/data/temp</value>
   </property>
</configuration>
hdfs-site.xml:
<configuration>
<property>
   <name>dfs.replication</name>
   <value>2</value>
   <description>replication</description>
 </property>
 <property>
   <name>dfs.namenode.name.dir</name>
   <value>file:///usr/local/hadoop_store/hdfs/namenode</value>
   <description>namenode directory</description>
</property>
 <property>
   <name>dfs.datanode.data.dir</name>
   <value>file:///data/hdfs/datanode</value>
   <description>datanode directory</description>
 </property>
 <property>
   <name>dfs.blocksize</name>
   <value>134217728</value>
   <description>file system block size</description>
 </property>
 <property>
   <name>dfs.datanode.data.dir.perm</name>
   <value>777</value>
 </property>
</configuration>
yarn-site.xml:
<configuration>
 
<!-- Site specific YARN configuration properties -->
<property>
   <name>yarn.nodemanager.aux-services</name>
   <value>mapreduce_shuffle</value>
</property>
<property>
   <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
   <value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
<property>
   <name>yarn.resourcemanager.scheduler.address</name>
   <value>foam:8030</value>
</property>
<property>
   <name>yarn.resourcemanager.address</name>
   <value>foam:8032</value>
</property>
<property>
   <name>yarn.resourcemanager.webapp.address</name>
   <value>foam:8088</value>
</property>
<property>
   <name>yarn.resourcemanager.resource-tracker.address</name>
   <value>foam:8031</value>
</property>
<property>
   <name>yarn.resourcemanager.admin.address</name>
   <value>foam:8033</value>
</property>
<property>
   <name>yarn.resourcemanager.hostname</name>
   <value>foam</value>
</property>
<property>
   <name>yarn.scheduler.maximum-allocation-mb</name>
   <value>26624</value>
</property>
<property>
   <name>yarn.nodemanager.resource.memory-mb</name>
   <value>26624</value>
</property>
 
 
</configuration>
mapred-site.xml:
<configuration>
 <property>
  <name>mapreduce.framework.name</name>
  <value>yarn</value>
 </property>
</configuration>

And hadoop-env.sh was modified to contain this:

# The java implementation to use.
export JAVA_HOME=/usr/lib/jvm/java-7-oracle

I am trying to build 64-bit native libraries for Hadoop (the libraries that go in /usr/local/hadoop/lib/native). The standard compiled version that is available for download comes with 32-bit native libraries. I have managed to create a 64-bit libhadoop.so, but not libhdfs.so. Hadoop works without these libraries (it just uses libraries that are already installed with Oracle Java 7), but it evidently runs faster when it uses the Hadoop-specific “native” versions.

MPI and NFS

NFS and the MPICH2 implementation of MPI were installed by following the guide here.

Next Steps

Next, I plan to experiment with Mesos and Hama.

My Ubuntu System

June 10th, 2014

I’ve used Ubuntu as my main OS since November of 2007. All of my M.S. and Ph.D. research and almost all of my programming experience has happened in Ubuntu. I do all of my video editing and nearly all of my photo editing in Ubuntu, as well. I rarely need to use Microsoft Windows, but when I do, I run it inside of a virtual machine in Ubuntu.

Many people dislike Ubuntu nowadays because of the Unity interface and various other reasons (e.g., Mir), but I am still rather happy with it—even after evaluating several alternative Linux-based operating systems (Note: Linux Mint 17 is a strong competetor). I thought that it might be helpful to share some details of the software that I use and a few of the settings that I change when I install Ubuntu. This entry primarily covers GUI software, however a few command-line tools are mentioned. I’ll update this periodically as my preferences change (and as I learn new things). If you have any suggestions, please let me know.

Synaptic

I begin by installing the Synaptic Package Manager because it is faster and allows more flexibility than the Ubuntu Software Center.

$ sudo apt-get install synaptic
Read the rest of this entry »

Dissertation

December 12th, 2013

On December 6th, 2013, I successfully defended my Ph.D. dissertation, entitled The Evolution of Stellar Velocity Dispersion in Galaxy Mergers. Later today, I will submit the final document to the UCR Graduate Division. The document primarily consists of three papers: a paper that I published in 2012, a second paper that will hopefully be accepted for publication soon, and a third paper that is still being written. Thus, two thirds of the dissertation is in pretty nice shape. The other third is more than a rough draft of a paper, but it is far from being polished. In case anyone is curious, the latest version of the document is here:


Click to download I am not particularly fond of the format that UCR requires, but I tried to make the best of it.


List Comprehensions

November 22nd, 2013

I find myself using list comprehensions in Python a lot lately. Here’s a nifty example that combines a ternary operation and a list comprehension:
resultArray = [i if (i > 0.01) else 0 for i in inputArray if i >= 0]
English translation: Create an output array that stores all of the non-negative values of the input array. If an input value is less than or equal to 0.01, set it to zero.

Explanation: Reading this starting in the middle, it says “For every entry, i, in input_array, first test whether the entry is greater than or equal to zero. If and only if the entry, i, passes the test, continue with statement in the first part of the bracket; set i to zero if it is smaller than 0.01, otherwize leave it unchanged.”

The equivalent code in C++03 would be considerably more involved. To make it look simple, a programmer would have to do some extra work in advance, or add an extra dependency by using a feature from Boost or another library. In C++11, obtaining the same result is pretty simple because of the range-based for loop feature. If we assume the output array has already been created then the equivalent code in a single, condensed line of C++11 is:
for (auto& i : inputArray) { if (i >= 0) resultArray.push_back((i > 0.01) ? i : 0); }
This isn’t too bad. Of course, it’s still not quite as elegant as the Python syntax.

Vectorization with SSE

October 18th, 2013

Using OpenCL, CUDA, or OpenACC to take advantage of the computing power offered by GPGPUs is the most effective way to accelerate computationally expensive code nowadays. However, not all machines have powerful GPGPUs.  On the other hand, all modern x86 processors from Intel and AMD support vector instructions in the form of Streaming SIMD Extensions (SSE – SSE4) and most new processors support Advanced Vector Extensions (AVX). Utilizing these instructions makes it easy to improve the performance of your code by as much as a factor of eight (AVX-512 will increase this to a factor of 16, but this is only for Intel MIC cards). In some cases, compilers can automatically vectorize pieces of code to take advantage of the CPU’s vector units. Furthermore, OpenMP 4.0 allows you to automatically vectorize certain sections of code, but writing code with vector operations in mind will generally yield better results. In this post, I will briefly explain how to use SSE to vectorize C and C++ code using GCC 4.8. Most of the instructions should also apply to LLVM/Clang and Microsoft Visual Studio.

SSE registers are 128 bits (16 bytes) wide. This means that each register can store 4 single precision floating point numbers or 2 double precision floating point numbers as well as other types that add up to 16 bytes. Using SSE, we can perform 4 single precision or 2 double precision floating point operations simultaneously on one CPU core using one instruction. Note: A superscalar processor, containing multiple floating point units, can also perform multiple floating point operations simultaneously, however a separate instruction is issued for each operation and the programmer has little, if any, control over which operations are performed simultaneously. Using SSE, we also have more control over the cache and prefetching. It’s even possible to bypass the cache entirely, which can be useful in some cases where cache pollution would cause a performance hit. With SSE, we can even eliminate some branches in the code, thereby reducing the chance of a branch misprediction, which would necessitate a pipeline flush. This can potentially improve performance further.

First-generation SSE instructions can be accessed using the header xmmintrin.h.  SSE2 and 3 instructions can be used by including emmintrin.h and pmmintrin.h, respectively. To access all vector extensions, including SSE4 and AVX, use immintrin.h. You also need to consult your compiler’s documentation to learn which flags / switches are required to enable the instructions. In the code example below, I demonstrate…
Read the rest of this entry »

GSnap, Polished

September 8th, 2013


While preparing a paper introducing GSnap to the world, I have refined several of the features.  The code has also become more modular and the source documentation is now more detailed. Here’s a brief overview of what has been done:
  • Divided the source into five components: Core, IO (input/output), CLI, GUI, and Viz (visualization).
  • Used the Qt Framework’s QtScript functionality to add a nice scripting interface. The base scripting language is ECMAScript (JavaScript). This is augmented with custom functions which manipulate GSnap’s internal objects.
  • Re-implemented the depth of field effect as a separate class rather than a large, complicated member function of the volume rendering class. The new depth of field  effect uses a faster Gaussian blur algorithm and produces much better results because larger blur radii are now practical.
  • Improved the performance and memory usage of the volume renderer for a given quality level by using a std::priority_queue to accelerate the neighbor search. This allows higher-quality volume rendering in a reasonable amount of time.
  • Finished the gas temperature computation and color mapping feature.
  • Redesigned the snapshot I/O to make it easier to add other snapshot format readers.
  • Added a new visualization option, “–view stars,” which is intermediate between “–view particles” and “–view flux.” Star particles are rendered as points, but they are rendered in the context of the volume renderer. This means that interstellar dust is able to obscure and redden the starlight. The results are somewhat similar to Sunrise output. This option is better than the “–view flux” option for simulations containing hundreds of millions of billions of star particles

In addition to that, many of the code formatting rules are now automatically enforced using AStyle and the coding guide now discourages the use of exceptions and RTTI (as do Google, the LLVM project, and others). Several GCC-specific features have been exploited in order to improve the performance of the code when it is compiled with g++. Overall, the final binary file is smaller by about 30% and the volume rendering code runs about 10% faster just because of the GCC-specific pragmas and function attributes combined with the set of compiler flags that are now used. The performance of the velocity dispersion statistics functionality improved negligibly. The “hot” and “cold” function attributes were particularly helpful.

Here are a few sample renderings…

First, a demonstration of the improved color mapping and depth of field features. The gas in the foreground galaxy is in focus while the gas in the background galaxy is blurred. This draws attention to the foreground object without the need to remove the background object from the scene. Also, note: dark red = cold gas, white = warm gas.

snapshot_1082

Without the depth of field effect, your attention is not as focussed on the foreground object, since the background object is equally in focus.

snapshot_1199
Of course, foreground objects are also blurred. This is the same system as above, but rotated so that the background object is now in the front:

snapshot_0591

The effect works equally well with visual renderings:
snapshot_0581

snapshot_0221


Now, here’s a demonstration of the new –view stars option without including any gas in the rendering:

snapshot_300__particles
That’s pretty basic. Including gas (and dust) in the rendering process makes this looks more like a galaxy:

snapshot_300__stars

But in a real galaxy, you can’t see the individual stars like this. In fact, the particles in the simulation don’t really represent stars; they are more like tracer particles which represent the density field in a statistical sense. For a more realistic representation, we use the full volume renderer, which smooths the particles into a continuum:

snapshot_300__flux1

For a bit more fun, we can look at the gas distribution, and then do some post-processing. Here’s the gas by itself:

snapshot_300__gas

If we add the gas image to the star image using photo editing software like ImageMagick, GIMP, or Photoshop, we get:

snapshot_300__stars+gas

The distribution of stars relative to the gas is interesting. Finally, if we add the dust-attenuated star image to the gas image, we get this nifty looking rendering:

snapshot_300__stars+gas+dust

User-defined Literals in C++11

April 28th, 2013

The new C++11 language offers many convenient new features. The auto and constexpr keywords, variadic templates, lambda expressions, uniform initialization, and the improved standard library are often mentioned, but there are many other less well-known changes. Since the LLVM/Clang and GCC compilers now support the full C++11 language, I decided to take a closer look at the language specification and begin using some of the new features. In the process, I stumbled upon a feature that evidently hasn’t received much publicity: user-defined literals. Among other things, this feature allows you to deal with unit conversions in a much more natural way. There’s an in-depth discussion here. I have summarized the unit conversion usage in the simple, artificial example below.

It has long been common to use type definitions or custom classes to keep track of units. For example, if I were working on a project that dealt with temperature, I would likely define the type, kelvin to store temperature values,
typedef float kelvin;
Using the kelvin type to store temperatures makes the code easier to understand and, in some cases, it allows the compiler to find potential errors. Suppose that I need to hard-code a few temperature constants into the project for some reason. Further suppose that all of the constants that I am entering were written down in degrees Fahrenheit rather than kelvin. With C++98, I would have either converted the temperatures to kelvin before entering them into the code or, more likely, I would have written a short function to convert from degrees Fahrenheit to kelvin and then used the function to do my conversions. With the new user-defined literals, I would do the following:
constexpr kelvin operator"" _F(long double T)
{
    return 0.55555556 * (T - 32.0) + 273.15; 
}
Then, writing _F after a number, as in 129.4_F, would have the same effect as calling a function that returns kelvin,
_F(129.4);
So, the output of
const kelvin min_temperature = 129.4_F;

std::cout << min_temperature << std::endl;
would be 327.2611.
Furthermore, it could also be useful to also define _K as
constexpr kelvin operator"" _K(long double T)
{
    return T;
}
Then, whenever I enter a temperature that is measured in kelvin, I would append _K to clarify the meaning to anyone reading the code (and also to the compiler). It gets even better; suppose kelvin is a class that can be initialized with a floating point number. If an overloaded function overloaded() had two different forms—overloaded(float) and overloaded(kelvin)—entering overloaded(239.1) would call the float form, while overloaded(239.1_K) would call the kelvin form. With C++98, we would have had to use a cast, as in
overloaded( static_cast<kelvin>(239.1) ) 
or at least write
overloaded( kelvin(239.1) ) 
Once again, this has been an artificial example. The general method illustrated here can be very useful in more complicated, real-world situations. Also, I have only shown a very specific example of the “cooked” form of user-defined literals for floating point data types. There are also “raw” forms. Refer to the links at the beginning of this post for more information.