Feeds:
Posts
Comments

Archive for the ‘Hadoop’ Category

Solution:

First, check apache web site to make sure your Hadoop and Pig versions are compatible.

Then, make sure the following two environment variables are set correctly:

export PIG_CLASSPATH=~/hadoop-0.23.4/etc/hadoop

export HADOOP_HOME=~/hadoop-0.23.4

Read Full Post »

This post demostrates hadoop distributedshell example.

zhengqiu@hadoop:~/hadoop-2.0.1-alpha/bin$ ./hadoop org.apache.hadoop.yarn.applications.distributedshell.Client -jar ../share/hadoop/mapreduce/hadoop-yarn-applications-distributedshell-2.0.1-alpha.jar -help
12/10/19 15:49:33 INFO distributedshell.Client: Initializing Client
usage: Client
-appname <arg> Application Name. Default value -DistributedShell
-class <arg> Main class to be run for the ApplicationMaster.
-container_memory <arg> Amount of memory in MB to be requested to run the shell command
-debug Dump out debug information
-help Print usage
-jar <arg> Jar file containing the application master
-log_properties <arg> log4j.properties file
-master_memory <arg> Amount of memory in MB to be requested to run the application master
-num_containers <arg> No. of containers on which the shell command needs to be executed
-priority <arg> Application Priority. Default 0
-queue <arg> RM Queue in which this application is to be submitted
-shell_args <arg> Command line args for the shell script
-shell_cmd_priority <arg> Priority for the shell command containers
-shell_command <arg> Shell command to be executed by the Application Master
-shell_env <arg> Environment for shell script. Specified as env_key=env_val pairs
-shell_script <arg> Location of the shell script to be executed
-timeout <arg> Application timeout in milliseconds
-user <arg> User to run the application as

zhengqiu@hadoop:~/hadoop-2.0.1-alpha/bin$ ./hadoop org.apache.hadoop.yarn.applications.distributedshell.Client -jar ../share/hadoop/mapreduce/hadoop-yarn-applications-distributedshell-2.0.1-alpha.jar -shell_command ls
12/10/19 14:45:05 INFO distributedshell.Client: Initializing Client
12/10/19 14:45:05 INFO distributedshell.Client: Starting Client
12/10/19 14:45:05 INFO distributedshell.Client: Connecting to ResourceManager at /172.22.244.177:8040
12/10/19 14:45:05 WARN util.KerberosName: Kerberos krb5 configuration not found, setting default realm to empty
12/10/19 14:45:06 INFO distributedshell.Client: Got Cluster metric info from ASM, numNodeManagers=2
12/10/19 14:45:06 INFO distributedshell.Client: Got Cluster node info from ASM
12/10/19 14:45:06 INFO distributedshell.Client: Got node report from ASM for, nodeId=hadoop4:46971, nodeAddresshadoop4:8042, nodeRackName/default-rack, nodeNumContainers0, nodeHealthStatusis_node_healthy: true, health_report: “”, last_health_report_time: 1350672194514,
12/10/19 14:45:06 INFO distributedshell.Client: Got node report from ASM for, nodeId=hadoop3:44447, nodeAddresshadoop3:8042, nodeRackName/default-rack, nodeNumContainers0, nodeHealthStatusis_node_healthy: true, health_report: “”, last_health_report_time: 1350672195177,
12/10/19 14:45:06 INFO distributedshell.Client: Queue info, queueName=default, queueCurrentCapacity=0.0, queueMaxCapacity=1.0, queueApplicationCount=0, queueChildQueueCount=0
12/10/19 14:45:06 INFO distributedshell.Client: User ACL Info for Queue, queueName=default, userAcl=SUBMIT_APPLICATIONS
12/10/19 14:45:06 INFO distributedshell.Client: User ACL Info for Queue, queueName=default, userAcl=ADMINISTER_QUEUE
12/10/19 14:45:06 INFO distributedshell.Client: Got new application id=application_1350584236302_0003
12/10/19 14:45:06 INFO distributedshell.Client: Min mem capabililty of resources in this cluster 128
12/10/19 14:45:06 INFO distributedshell.Client: Max mem capabililty of resources in this cluster 10240
12/10/19 14:45:06 INFO distributedshell.Client: AM memory specified below min threshold of cluster. Using min value., specified=10, min=128
12/10/19 14:45:06 INFO distributedshell.Client: Setting up application submission context for ASM
12/10/19 14:45:06 INFO distributedshell.Client: Copy App Master jar from local filesystem and add to local environment
12/10/19 14:45:07 INFO distributedshell.Client: Set the environment for the application master
12/10/19 14:45:07 INFO distributedshell.Client: Trying to generate classpath for app master from current thread’s classpath
12/10/19 14:45:07 INFO distributedshell.Client: Readable bytes from stream=8559
12/10/19 14:45:07 INFO distributedshell.Client: Setting up app master command
12/10/19 14:45:07 INFO distributedshell.Client: Completed setting up app master command ${JAVA_HOME}/bin/java -Xmx128m org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster –container_memory 10 –num_containers 1 –priority 0 –shell_command ls 1><LOG_DIR>/AppMaster.stdout 2><LOG_DIR>/AppMaster.stderr
12/10/19 14:45:07 INFO distributedshell.Client: Submitting application to ASM
12/10/19 14:45:08 INFO distributedshell.Client: Got application report from ASM for, appId=3, clientToken=null, appDiagnostics=, appMasterHost=N/A, appQueue=, appMasterRpcPort=0, appStartTime=1350672290726, yarnAppState=SUBMITTED, distributedFinalState=UNDEFINED, appTrackingUrl=172.22.244.177:8088/proxy/application_1350584236302_0003/, appUser=zhengqiu
12/10/19 14:45:09 INFO distributedshell.Client: Got application report from ASM for, appId=3, clientToken=null, appDiagnostics=, appMasterHost=N/A, appQueue=, appMasterRpcPort=0, appStartTime=1350672290726, yarnAppState=SUBMITTED, distributedFinalState=UNDEFINED, appTrackingUrl=172.22.244.177:8088/proxy/application_1350584236302_0003/, appUser=zhengqiu
12/10/19 14:45:10 INFO distributedshell.Client: Got application report from ASM for, appId=3, clientToken=null, appDiagnostics=, appMasterHost=N/A, appQueue=, appMasterRpcPort=0, appStartTime=1350672290726, yarnAppState=SUBMITTED, distributedFinalState=UNDEFINED, appTrackingUrl=172.22.244.177:8088/proxy/application_1350584236302_0003/, appUser=zhengqiu
12/10/19 14:45:11 INFO distributedshell.Client: Got application report from ASM for, appId=3, clientToken=null, appDiagnostics=, appMasterHost=, appQueue=, appMasterRpcPort=0, appStartTime=1350672290726, yarnAppState=RUNNING, distributedFinalState=UNDEFINED, appTrackingUrl=, appUser=zhengqiu
12/10/19 14:45:12 INFO distributedshell.Client: Got application report from ASM for, appId=3, clientToken=null, appDiagnostics=, appMasterHost=, appQueue=, appMasterRpcPort=0, appStartTime=1350672290726, yarnAppState=RUNNING, distributedFinalState=UNDEFINED, appTrackingUrl=, appUser=zhengqiu
12/10/19 14:45:13 INFO distributedshell.Client: Got application report from ASM for, appId=3, clientToken=null, appDiagnostics=, appMasterHost=, appQueue=, appMasterRpcPort=0, appStartTime=1350672290726, yarnAppState=RUNNING, distributedFinalState=UNDEFINED, appTrackingUrl=, appUser=zhengqiu
12/10/19 14:45:14 INFO distributedshell.Client: Got application report from ASM for, appId=3, clientToken=null, appDiagnostics=, appMasterHost=, appQueue=, appMasterRpcPort=0, appStartTime=1350672290726, yarnAppState=FINISHED, distributedFinalState=FAILED, appTrackingUrl=, appUser=zhengqiu
12/10/19 14:45:14 INFO distributedshell.Client: Application did finished unsuccessfully. YarnState=FINISHED, DSFinalStatus=FAILED. Breaking monitoring loop
12/10/19 14:45:14 ERROR distributedshell.Client: Application failed to complete successfully
zhengqiu@hadoop:~/hadoop-2.0.1-alpha/bin$

go to ResourceManager web interface to check the log

Capture3

You may find the information like
“……………is running beyond virtual memory limits. Current usage: 35.4mb of 128.0mb physical memory used; 286.0mb of 268.8 virtual memory used. Killing container…………………”

Which indicates that the required virtual memory (260mb) exceeds the allowed virtual memory (260mb).
Solution: edit yarn-site.xml by adding
<property>
<name>yarn.nodenamager.vmem-pmem-ratio</name>
<value>3</value>
</property>

yarn.nodenamager.vmem-pmem-ratio: The virtual memory usage of each task may exceed its physical memory limit by this ratio. The total amount of virtual memory used by tasks on the NodeManager may exceed its physical memory usage by this ratio.

By default, the value is 2.1, so increasing this value will increase the allowed virtual memory.
Also refer to http://x-rip.iteye.com/blog/1533106. The same problem was also reported.

Update the yarn-site.xml file on each node and restart hadoop. Run the command again:
zhengqiu@hadoop:~/hadoop-2.0.1-alpha/bin$ ./hadoop org.apache.hadoop.yarn.applications.distributedshell.Client -jar ../share/hadoop/mapreduce/hadoop-yarn-applications-distributedshell-2.0.1-alpha.jar -shell_command ls
12/10/19 15:19:48 INFO distributedshell.Client: Initializing Client
12/10/19 15:19:48 INFO distributedshell.Client: Starting Client
12/10/19 15:19:48 INFO distributedshell.Client: Connecting to ResourceManager at /172.22.244.177:8040
12/10/19 15:19:49 WARN util.KerberosName: Kerberos krb5 configuration not found, setting default realm to empty
12/10/19 15:19:49 INFO distributedshell.Client: Got Cluster metric info from ASM, numNodeManagers=2
12/10/19 15:19:49 INFO distributedshell.Client: Got Cluster node info from ASM
12/10/19 15:19:49 INFO distributedshell.Client: Got node report from ASM for, nodeId=hadoop3:53328, nodeAddresshadoop3:8042, nodeRackName/default-rack, nodeNumContainers0, nodeHealthStatusis_node_healthy: true, health_report: “”, last_health_report_time: 1350674330637,
12/10/19 15:19:49 INFO distributedshell.Client: Got node report from ASM for, nodeId=hadoop4:41566, nodeAddresshadoop4:8042, nodeRackName/default-rack, nodeNumContainers0, nodeHealthStatusis_node_healthy: true, health_report: “”, last_health_report_time: 1350674332057,
12/10/19 15:19:49 INFO distributedshell.Client: Queue info, queueName=default, queueCurrentCapacity=0.0, queueMaxCapacity=1.0, queueApplicationCount=0, queueChildQueueCount=0
12/10/19 15:19:49 INFO distributedshell.Client: User ACL Info for Queue, queueName=default, userAcl=SUBMIT_APPLICATIONS
12/10/19 15:19:49 INFO distributedshell.Client: User ACL Info for Queue, queueName=default, userAcl=ADMINISTER_QUEUE
12/10/19 15:19:49 INFO distributedshell.Client: Got new application id=application_1350674345000_0001
12/10/19 15:19:49 INFO distributedshell.Client: Min mem capabililty of resources in this cluster 128
12/10/19 15:19:49 INFO distributedshell.Client: Max mem capabililty of resources in this cluster 10240
12/10/19 15:19:49 INFO distributedshell.Client: AM memory specified below min threshold of cluster. Using min value., specified=10, min=128
12/10/19 15:19:49 INFO distributedshell.Client: Setting up application submission context for ASM
12/10/19 15:19:49 INFO distributedshell.Client: Copy App Master jar from local filesystem and add to local environment
12/10/19 15:19:50 INFO distributedshell.Client: Set the environment for the application master
12/10/19 15:19:50 INFO distributedshell.Client: Trying to generate classpath for app master from current thread’s classpath
12/10/19 15:19:50 INFO distributedshell.Client: Readable bytes from stream=8559
12/10/19 15:19:50 INFO distributedshell.Client: Setting up app master command
12/10/19 15:19:50 INFO distributedshell.Client: Completed setting up app master command ${JAVA_HOME}/bin/java -Xmx128m org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster –container_memory 10 –num_containers 1 –priority 0 –shell_command ls 1><LOG_DIR>/AppMaster.stdout 2><LOG_DIR>/AppMaster.stderr
12/10/19 15:19:50 INFO distributedshell.Client: Submitting application to ASM
12/10/19 15:19:51 INFO distributedshell.Client: Got application report from ASM for, appId=1, clientToken=null, appDiagnostics=, appMasterHost=N/A, appQueue=, appMasterRpcPort=0, appStartTime=1350674374290, yarnAppState=SUBMITTED, distributedFinalState=UNDEFINED, appTrackingUrl=172.22.244.177:8088/proxy/application_1350674345000_0001/, appUser=zhengqiu
12/10/19 15:19:52 INFO distributedshell.Client: Got application report from ASM for, appId=1, clientToken=null, appDiagnostics=, appMasterHost=N/A, appQueue=, appMasterRpcPort=0, appStartTime=1350674374290, yarnAppState=SUBMITTED, distributedFinalState=UNDEFINED, appTrackingUrl=172.22.244.177:8088/proxy/application_1350674345000_0001/, appUser=zhengqiu
12/10/19 15:19:53 INFO distributedshell.Client: Got application report from ASM for, appId=1, clientToken=null, appDiagnostics=, appMasterHost=N/A, appQueue=, appMasterRpcPort=0, appStartTime=1350674374290, yarnAppState=SUBMITTED, distributedFinalState=UNDEFINED, appTrackingUrl=172.22.244.177:8088/proxy/application_1350674345000_0001/, appUser=zhengqiu
12/10/19 15:19:54 INFO distributedshell.Client: Got application report from ASM for, appId=1, clientToken=null, appDiagnostics=, appMasterHost=N/A, appQueue=, appMasterRpcPort=0, appStartTime=1350674374290, yarnAppState=SUBMITTED, distributedFinalState=UNDEFINED, appTrackingUrl=172.22.244.177:8088/proxy/application_1350674345000_0001/, appUser=zhengqiu
12/10/19 15:19:55 INFO distributedshell.Client: Got application report from ASM for, appId=1, clientToken=null, appDiagnostics=, appMasterHost=, appQueue=, appMasterRpcPort=0, appStartTime=1350674374290, yarnAppState=RUNNING, distributedFinalState=UNDEFINED, appTrackingUrl=, appUser=zhengqiu
12/10/19 15:19:56 INFO distributedshell.Client: Got application report from ASM for, appId=1, clientToken=null, appDiagnostics=, appMasterHost=, appQueue=, appMasterRpcPort=0, appStartTime=1350674374290, yarnAppState=RUNNING, distributedFinalState=UNDEFINED, appTrackingUrl=, appUser=zhengqiu
12/10/19 15:19:57 INFO distributedshell.Client: Got application report from ASM for, appId=1, clientToken=null, appDiagnostics=, appMasterHost=, appQueue=, appMasterRpcPort=0, appStartTime=1350674374290, yarnAppState=RUNNING, distributedFinalState=UNDEFINED, appTrackingUrl=, appUser=zhengqiu
12/10/19 15:19:58 INFO distributedshell.Client: Got application report from ASM for, appId=1, clientToken=null, appDiagnostics=, appMasterHost=, appQueue=, appMasterRpcPort=0, appStartTime=1350674374290, yarnAppState=FINISHED, distributedFinalState=SUCCEEDED, appTrackingUrl=, appUser=zhengqiu
12/10/19 15:19:58 INFO distributedshell.Client: Application has completed successfully. Breaking monitoring loop
12/10/19 15:19:58 INFO distributedshell.Client: Application completed successfully

Check log for the above Linux ls command result:
zhengqiu@hadoop4:/tmp/logs/application_1350675606528_0023$ ll
total 16
drwx–x— 4 zhengqiu zhengqiu 4096 2012-10-19 16:46 ./
drwxr-xr-x 22 zhengqiu zhengqiu 4096 2012-10-19 16:46 ../
drwx–x— 2 zhengqiu zhengqiu 4096 2012-10-19 16:46 container_1350675606528_0023_01_000001/
drwx–x— 2 zhengqiu zhengqiu 4096 2012-10-19 16:46 container_1350675606528_0023_01_000002/
zhengqiu@hadoop4:/tmp/logs/application_1350675606528_0023$ find
.
./container_1350675606528_0023_01_000001
./container_1350675606528_0023_01_000001/AppMaster.stderr
./container_1350675606528_0023_01_000001/AppMaster.stdout
./container_1350675606528_0023_01_000002
./container_1350675606528_0023_01_000002/stdout
./container_1350675606528_0023_01_000002/stderr
zhengqiu@hadoop4:/tmp/logs/application_1350675606528_0023$ more ./container_1350675606528_0023_01_000002/stdout
container_tokens
default_container_executor.sh
launch_container.sh
tmp

Run Linux cal command:
zhengqiu@hadoop:~/hadoop-2.0.1-alpha/bin$ ./hadoop org.apache.hadoop.yarn.applications.distributedshell.Client -jar ../share/hadoop/mapreduce/hadoop-yarn-applications-distributedshell-2.0.1-alpha.jar -debug -shell_command cal

Check log:
zhengqiu@hadoop4:/tmp/logs$ cd application_1350675606528_0030/
zhengqiu@hadoop4:/tmp/logs/application_1350675606528_0030$ find
.
./container_1350675606528_0030_01_000002
./container_1350675606528_0030_01_000002/stdout
./container_1350675606528_0030_01_000002/stderr
./container_1350675606528_0030_01_000001
./container_1350675606528_0030_01_000001/AppMaster.stderr
./container_1350675606528_0030_01_000001/AppMaster.stdout
zhengqiu@hadoop4:/tmp/logs/application_1350675606528_0030$ more ./container_1350675606528_0030_01_000002/stdout
October 2012
Su Mo Tu We Th Fr Sa
1 2 3 4 5 6
7 8 9 10 11 12 13
14 15 16 17 18 19 20
21 22 23 24 25 26 27
28 29 30 31

Run Linux cal commad in two containers:
zhengqiu@hadoop:~/hadoop-2.0.1-alpha/bin$ ./hadoop org.apache.hadoop.yarn.applications.distributedshell.Client -jar ../share/hadoop/mapreduce/hadoop-yarn-applications-distributedshell-2.0.1-alpha.jar -debug -shell_command cal -num_containers 2

Check log:
Capture4
On datanode hadoop3, the output was generated twice.

zhengqiu@hadoop:~/hadoop-2.0.1-alpha/bin$ ./hadoop org.apache.hadoop.yarn.applications.distributedshell.Client -jar ../share/hadoop/mapreduce/hadoop-yarn-applications-distributedshell-2.0.1-alpha.jar -debug -shell_command cal -num_containers 3

Check log:
Capture5
On datanode hadoop4, the output was generated three times.

reference: http://www.mail-archive.com/mapreduce-user@hadoop.apache.org/msg03853.html

Read Full Post »

This post shows the step-by step instructions to deploy a hadoop cluster (3 nodes) on the virtual network using virtualbox.

NameNode: 192.168.10.1 hadoop

ResourceManager: 192.168.10.2 hadoop2

DataNode: 192.168.10.3 hadoop3

Install Virtualbox

Install Ubuntu in Virtualbox (Install 3 copies for the 3 nodes and name them as hadoop, hadoop2 and hadoop3 respectively)

download Ubuntu from http://releases.ubuntu.com/lucid/ubuntu-10.04.4-server-i386.iso

fig8
check “Enable Network Adapter”, select “Bridged Adapter”

fig9
Choose Install Ubuntu Server

fig10
Use the same username for each node (in this example, I used zcai for all three nodes)

fig11
choose OpenSSH Server to install

create passwordless ssh login between nodes
zcai@hadoop:~$ssh-keygen -t rsa
zcai@hadoop:~$cat ~/.ssh/id_rsa.pub ~/.ssh/authorized_keys

copy keys to other nodes
zcai@hadoop:~$scp -r ~/.ssh 196.168.10.2
zcai@hadoop:~$scp -r ~/.ssh 196.168.10.3

use ssh to login to make sure the keys work.

install java jdk on each node
$apt-get install openjdk-6-jdk

download hadoop packag
zcai@hadoop:~$tar zxvf hadoop-2.0.1-alpha.tar.gz

create and edit configuration files
create a file hadoop-env.sh under ~/hadoop-2.0.1-alpha/etc/hadoop with the content the following command shows:
zcai@hadoop:~/hadoop-2.0.1-alpha/etc/hadoop$ more hadoop-env.sh
export JAVA_HOME=/usr/lib/jvm/java-6-openjdk/jre
export HADOOP_HOME=~/hadoop-2.0.1-alpha
export HADOOP_MAPRED_HOME=${HADOOP_HOME}
export HADOOP_COMMON_HOME=${HADOOP_HOME}
export HADOOP_HDFS_HOME=${HADOOP_HOME}
export YARN_HOME=${HADOOP_HOME}
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export YARN_CONF_DIR=~${HADOOP_HOME}/etc/hadoop

Set JAVA_HOME in yarn-env.sh
zcai@hadoop:~/hadoop-2.0.1-alpha/etc/hadoop$ more yarn-env.sh
…………
# some Java parameters
export JAVA_HOME=/usr/lib/jvm/java-6-openjdk/jre
…………….
……………….

zcai@hadoop:~/hadoop-2.0.1-alpha/etc/hadoop$ more yarn-site.xml
<?xml version=”1.0″?>
<configuration>
<property>
<name>yarn.resourcemanager.address</name>
<value>192.168.10.2:8040</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.address</name>
<value>192.168.10.2:8025</value>
</property>
<property>
<name>yarn.resourcemanager.resource-tracker.address</name>
<value>192.168.10.2:8030</value>
</property>
<property>
<name>yarn.resourcemanager.admin.address</name>
<value>192.168.10.2:8031</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address</name>
<value>192.168.10.2:8088</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce.shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
</configuration>

zcai@hadoop:~/hadoop-2.0.1-alpha/etc/hadoop$ more core-site.xml
<?xml version=”1.0″ encoding=”UTF-8″?>
<?xml-stylesheet type=”text/xsl” href=”configuration.xsl”?>
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://192.168.10.1:8020</value>
</property>
</configuration>

create a file mapred-site.xml under ~/hadoop-2.0.1-alpha/etc/hadoop with the content the following command shows:
zcai@hadoop:~/hadoop-2.0.1-alpha/etc/hadoop$ more mapred-site.xml
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>mapreduce.cluster.temp.dir</name>
<value>/tmp</value>
</property>
<property>
<name>mapreduce.cluster.local.dir</name>
<value>/local</value>
</property>
</configuration>

zcai@hadoop:~/hadoop-2.0.1-alpha/etc/hadoop$ more hdfs-site.xml
<configuration>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:///home/zcai/namenode</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:///home/zcai/datanode</value>
</property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>

zcai@hadoop:~/hadoop-2.0.1-alpha/etc/hadoop$ more slaves
192.168.10.3

Copy configured hadoop package to other nodes:
zcai@hadoop:~$ scp -r hadoop-2.0.1-alpha 196.168.10.2
zcai@hadoop:~$ scp -r hadoop-2.0.1-alpha 196.168.10.3

Format namenode (the command should be run on the namenode 192.168.10.1):
zcai@hadoop:~/hadoop-2.0.1-alpha/bin$./hdfs namenode -format

zcai@hadoop:~/hadoop-2.0.1-alpha/sbin$ ./start-dfs.sh
12/10/17 14:51:34 WARN util.KerberosName: Kerberos krb5 configuration not found, setting default realm to empty
Starting namenodes on [172.22.244.221]
192.168.10.1: starting namenode, logging to /home/zcai/hadoop-2.0.1-alpha/logs/hadoop-zcai-namenode-hadoop.out
192.168.10.3: starting datanode, logging to /home/zcai/hadoop-2.0.1-alpha/logs/hadoop-zcai-datanode-hadoop3.out
Starting secondary namenodes [0.0.0.0]
0.0.0.0: starting secondarynamenode, logging to /home/zcai/hadoop-2.0.1-alpha/logs/hadoop-zcai-secondarynamenode-hadoop.out

Start Hadoop
zcai@hadoop2:~/hadoop-2.0.1-alpha/sbin$ ./start-yarn.sh
starting yarn daemons
starting resourcemanager, logging to /home/zcai/hadoop-2.0.1-alpha/logs/yarn-zcai-resourcemanager-hadoop2.out
192.168.10.3: starting nodemanager, logging to /home/zcai/hadoop-2.0.1-alpha/logs/yarn-zcai-nodemanager-hadoop3.out

if it is working, you will find the following processes running.
On ResourceManager node
zcai@hadoop2:~/hadoop-2.0.1-alpha/sbin$ jps
1811 ResourceManager
2062 Jps

On namenode
zcai@hadoop:~/hadoop-2.0.1-alpha/sbin$ jps
13079 NameNode
13362 Jps
13312 SecondaryNameNode

On datanode:
zcai@hadoop3:~/hadoop-2.0.1-alpha/sbin$ jps
9886 DataNode
10050 NodeManager
10237 Jps

Test HDFS:
zcai@hadoop:~/hadoop-2.0.1-alpha/bin$ ./hadoop fs -mkdir caitest
12/10/17 16:25:33 WARN util.KerberosName: Kerberos krb5 configuration not found, setting default realm to empty
zhengqiu@hadoop3:~/hadoop-2.0.1-alpha/bin$ ./hadoop fs -lsr
lsr: DEPRECATED: Please use ‘ls -R’ instead.
12/10/17 16:25:48 WARN util.KerberosName: Kerberos krb5 configuration not found, setting default realm to empty
drwxr-xr-x – zcai supergroup 0 2012-10-17 16:25 caitest
zcai@hadoop:~/hadoop-2.0.1-alpha/bin$ ./hadoop fs -put x caitest
12/10/17 16:26:11 WARN util.KerberosName: Kerberos krb5 configuration not found, setting default realm to empty
zhengqiu@hadoop3:~/hadoop-2.0.1-alpha/bin$ ./hadoop fs -lsr
lsr: DEPRECATED: Please use ‘ls -R’ instead.
12/10/17 16:26:15 WARN util.KerberosName: Kerberos krb5 configuration not found, setting default realm to empty
drwxr-xr-x – zcai supergroup 0 2012-10-17 16:26 caitest
-rw-r–r– 1 zcai supergroup 12814 2012-10-17 16:26 caitest/x

go to http://192.168.10.2:8088 (yarn.resourcemanager.webapp.address in yarn-site.xml)

fig7

Run a map/reduce example application on the virtual hadoop cluster to ensure it is working
download an earlier version of hadoop, such as hadoop-0.20.XX.tar.gz, uncompressed it and you will find the example hadoop-examples-0.20-xxx.jar.

zcai@hadoop:~/hadoop-2.0.1-alpha/bin$ ./hadoop fs -mkdir test-input
12/10/18 11:34:21 WARN util.KerberosName: Kerberos krb5 configuration not found, setting default realm to empty
zcai@hadoop3:~/hadoop-2.0.1-alpha/bin$ ./hadoop fs -put x test-input
12/10/18 11:34:35 WARN util.KerberosName: Kerberos krb5 configuration not found, setting default realm to empty
zcai@hadoop:~/hadoop-2.0.1-alpha/bin$ ./hadoop fs -ls -R
12/10/18 11:34:44 WARN util.KerberosName: Kerberos krb5 configuration not found, setting default realm to empty
drwxr-xr-x – zcai supergroup 0 2012-10-18 11:34 test-input
-rw-r–r– 1 zcai supergroup 12814 2012-10-18 11:34 test-input/x

zcai@hadoop:~/hadoop-2.0.1-alpha/bin$ ./hadoop jar ~/hadoop-examples-0.20.205.0.jar wordcount test-input test-output
12/10/18 11:40:18 WARN util.KerberosName: Kerberos krb5 configuration not found, setting default realm to empty
12/10/18 11:40:19 INFO input.FileInputFormat: Total input paths to process : 1
12/10/18 11:40:19 INFO util.NativeCodeLoader: Loaded the native-hadoop library
12/10/18 11:40:19 WARN snappy.LoadSnappy: Snappy native library not loaded
12/10/18 11:40:19 INFO mapreduce.JobSubmitter: number of splits:1
12/10/18 11:40:19 WARN conf.Configuration: mapreduce.combine.class is deprecated. Instead, use mapreduce.job.combine.class
12/10/18 11:40:19 WARN conf.Configuration: mapred.jar is deprecated. Instead, use mapreduce.job.jar
12/10/18 11:40:19 WARN conf.Configuration: mapreduce.map.class is deprecated. Instead, use mapreduce.job.map.class
12/10/18 11:40:19 WARN conf.Configuration: mapred.job.name is deprecated. Instead, use mapreduce.job.name
12/10/18 11:40:19 WARN conf.Configuration: mapreduce.reduce.class is deprecated. Instead, use mapreduce.job.reduce.class
12/10/18 11:40:19 WARN conf.Configuration: mapred.input.dir is deprecated. Instead, use mapreduce.input.fileinputformat.inputdir
12/10/18 11:40:19 WARN conf.Configuration: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
12/10/18 11:40:19 WARN conf.Configuration: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
12/10/18 11:40:19 WARN conf.Configuration: mapred.output.value.class is deprecated. Instead, use mapreduce.job.output.value.class
12/10/18 11:40:19 WARN conf.Configuration: mapred.output.key.class is deprecated. Instead, use mapreduce.job.output.key.class
12/10/18 11:40:19 WARN conf.Configuration: mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir
12/10/18 11:40:19 INFO mapred.ResourceMgrDelegate: Submitted application application_1350573728815_0002 to ResourceManager at /172.22.244.177:8040
12/10/18 11:40:19 INFO mapreduce.Job: The url to track the job: http://192.168.10.2:8088/proxy/application_1350573728815_0002/
12/10/18 11:40:19 INFO mapreduce.Job: Running job: job_1350573728815_0002
12/10/18 11:40:26 INFO mapreduce.Job: Job job_1350573728815_0002 running in uber mode : false
12/10/18 11:40:26 INFO mapreduce.Job: map 0% reduce 0%
12/10/18 11:40:31 INFO mapreduce.Job: map 100% reduce 0%
12/10/18 11:40:32 INFO mapreduce.Job: map 100% reduce 100%
12/10/18 11:40:32 INFO mapreduce.Job: Job job_1350573728815_0002 completed successfully
12/10/18 11:40:32 INFO mapreduce.Job: Counters: 43
File System Counters
FILE: Number of bytes read=12511
FILE: Number of bytes written=118934
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=12932
HDFS: Number of bytes written=11718
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Rack-local map tasks=1
Total time spent by all maps in occupied slots (ms)=23944
Total time spent by all reduces in occupied slots (ms)=25504
Map-Reduce Framework
Map input records=38
Map output records=269
Map output bytes=13809
Map output materialized bytes=12271
Input split bytes=118
Combine input records=269
Combine output records=137
Reduce input groups=137
Reduce shuffle bytes=12271
Reduce input records=137
Reduce output records=137
Spilled Records=274
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=243
CPU time spent (ms)=960
Physical memory (bytes) snapshot=199864320
Virtual memory (bytes) snapshot=749105152
Total committed heap usage (bytes)=136843264
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=12814
File Output Format Counters
Bytes Written=11718

Check the result
zcai@hadoop:~/hadoop-2.0.1-alpha/bin$ ./hadoop fs -ls -R
12/10/18 11:59:19 WARN util.KerberosName: Kerberos krb5 configuration not found, setting default realm to empty
drwxr-xr-x – zhengqiu supergroup 0 2012-10-18 11:34 test-input
-rw-r–r– 1 zhengqiu supergroup 12814 2012-10-18 11:34 test-input/x
drwxr-xr-x – zhengqiu supergroup 0 2012-10-18 11:40 test-output
-rw-r–r– 1 zhengqiu supergroup 0 2012-10-18 11:40 test-output/_SUCCESS
-rw-r–r– 1 zhengqiu supergroup 11718 2012-10-18 11:40 test-output/part-r-00000

Show word count statistics
zcai@hadoop:~/hadoop-2.0.1-alpha/bin$ ./hadoop fs -cat test-output/part-r-00000

go to http://192.168.10.2:8088 (yarn.resourcemanager.webapp.address in yarn-site.xml)

fig13

done.

Read Full Post »

If data are on a hadoop datanode or namenode, we can use hadoop fs -put or hadoop fs -copyFromLocal to upload the data to HDFS. But, if data are on a machine out of the Hadoop cluster, we cannot use the above command lines.

This java program solves the problem of uploading data directly to HDFS from a remote machine, and it is not necessary to download the data to a hadoop node and then upload them HDFSusing hadoop fs -put or hadoop fs -copyFromLocal .

To run it, the most current version of Java is required(The old versions of java have a bug and max file size supported is 4Gb. If the file to be transfered is larger than 4Gb, an exception will occur.).

import java.net.URL;
import java.net.URI;
import java.net.URLConnection;
import java.io.InputStream;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.net.MalformedURLException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.fs.permission.FsPermission;

class RemoteUploadData {
public static void main(String[] args) throws Exception {
if (args.length < 3) {
System.err.println(“Usage: java RemoteUploadData <URL_data_source> <full_path_file_name_on_HDFS> <hdfs_name_URI> <replication factor>”);
System.err.println(“For example : java RemoteUploadData ftp://ftp.1000genomes.ebi.ac.uk/vol1/ftp/technical/pilot2_high_cov_GRCh37_bams/data/NA19240/alignment/NA19240.chrom1.SOLID.bfast.YRI.high_coverage.20100311.bam /user/root/input/test_input.bam hdfs://ip-10-224-53-114.ec2.internal:50001″);
System.err.println(“This will upload the file from the ftp site to your hadoop file system.”);
System.exit(2);
}

try {
//String data_file = “ftp://ftp.1000genomes.ebi.ac.uk/vol1/ftp/technical/pilot2_high_cov_GRCh37_bams/data/NA19240/alignment/NA19240.chrom1.SOLID.bfast.YRI.high_coverage.20100311.bam“;
String data_file = args[0];
URL url = new URL(data_file);
URLConnection con = url.openConnection();
long fileSize = Long.parseLong(con.getHeaderField(“content-length”));

/*
Map fields = con.getHeaderFields();
Set set = fields.entrySet();
Iterator iterator = set.iterator();
while(iterator.hasNext()) {
Map.Entry me = (Map.Entry)iterator.next();
System.out.print(me.getKey() + “: “);
System.out.println(me.getValue());
}
*/
//InputStream is = con.getInputStream();
InputStream is = url.openStream();
//BufferedInputStream bis = new BufferedInputStream(is);

Configuration conf = new Configuration();
//file to be created
//Path file = new Path(“/user/root/input/test_input.bam”);
Path file = new Path(args[1]);
//initiate hdfs
DistributedFileSystem dfs = new DistributedFileSystem();
//dfs.initialize(new URI(“hdfs://ip-10-224-53-114.ec2.internal:50001”), conf); //fs.default.name
dfs.initialize(new URI(args[2]), conf);
FsPermission permissions = new FsPermission(“750″);
FSDataOutputStream out = null;
int bufferSize = 65536000;
long blockSize = 65536000;//64M
int totalBlocks = (int)(fileSize/blockSize);
//System.out.println(totalBlocks);
boolean overWrite = true;
try{
out = dfs.create(file,permissions,overWrite, bufferSize, (short)3, blockSize, null);
}catch(Exception e){
e.printStackTrace();
}

byte[] buf = new byte[bufferSize];
int n = is.read(buf);
/*
while (n >= 0){
out.write(buf, 0, n);
System.out.print(n+”.”);
n = is.read(buf);
}
*/
//dealing with network inputStream, block until the buf is fully filled
int end = 0;
double blockRead = 0;//generates double in the division operation, avoid casting
while (true){
while (n != buf.length){
int ret = is.read(buf, n, buf.length – n);
if (ret == -1) {
end = 1;
break;
}
n += ret;
}
out.write(buf, 0, n);
blockRead++;
if (fileSize > 0){
updateProgress((blockRead/totalBlocks));
}
//System.out.print(“.”);
n = 0;
if (end == 1){
break;
}
}

out.close();
is.close();
//bis.close();

} catch (MalformedURLException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}

static void updateProgress(double progressPercentage) {
final int width = 100; // progress bar width in chars
System.out.print(“\r[“);
int i = 0;
for (; i <= (int)(progressPercentage*width); i++) {
System.out.print(“>”);
}
for (; i < width; i++) {
System.out.print(“|”);
}
System.out.print(“]”);
}
}

How to compile it?
Specify java classpath and include the following four jars:
commons-logging-1.1.1.jar
hadoop-core-0.20.203.0.jar
commons-configuration-1.6.jar
commons-lang-2.4.jar

jdk1.7.0_04/bin/javac -classpath .:/usr/local/hadoop-0.20.203.0/lib/commons-logging-1.1.1.jar:/usr/local/hadoop-0.20.203.0/hadoop-core-0.20.203.0.jar:/usr/local/hadoop-0.20.203.0/lib/commons-configuration-1.6.jar:/usr/local/hadoop-0.20.203.0/lib/commons-lang-2.4.jar RemoteUploadData.java

How to run it?
jdk1.7.0_04/bin/java -classpath .:/usr/local/hadoop-0.20.203.0/lib/commons-logging-1.1.1.jar:/usr/local/hadoop-0.20.203.0/hadoop-core-0.20.203.0.jar:/usr/local/hadoop-0.20.203.0/lib/commons-configuration-1.6.jar:/usr/local/hadoop-0.20.203.0/lib/commons-lang-2.4.jar RemoteUploadData http://ec2-184-73-123-11.compute-1.amazonaws.com/cnv_alignment_files/control_coverage_5.sam /user/root/input/test_input.bam3 hdfs://ip-10-224-53-114.ec2.internal:50001

Read Full Post »

solution: increase memory size, such as hadoop jar XXX.jar -Dmapred.map.child.java.opts=”-Xmx3000m”

Read Full Post »

In the main method class, set the parameters to be passed
Configuration conf = new Configuration();
conf.set("name1", "value1");
conf.set("name2", "value2");
Job job = new Job(conf,"job discription"); //this line must be after the above conf.set lines, otherwise the pamareters cannot be passed

In the mapper/reducer class, get the passed parameters
conf = context.getConfiguration();
String value1 = conf.get("name1");
String value2 = conf.get("name2");

Read Full Post »

the read() method does not read across the block boundaries. Use readFully() instead to read until the end of a file, e.g. all data blocks of a file across the cluster can be read by readFully().

Read Full Post »

This example demostrates the usage of Hadoop to count DNA bases.
Notice: The method used in this example is not efficient. This example aims to show as many features of Hadoop as possible. Specificly, this example shows the custom InputFormat and RecordReader, custom partitioner, sort comparitor and grouping comparitor.

The input file for this test is a fasta file:

>a
cGTAaccaataaaaaaacaagcttaacctaattc
>a
cggcGGGcagatcta
>b
agcttagTTTGGatctggccgggg
>c
gcggatttactcCCCCCAAAAANNaggggagagcccagataaatggagtctgtgcgtccaca
gaattcgcacca
>c
gcggatttactcaggggagagcccagGGataaatggagtctgtgcgtccaca
gaattcgcacca
>d
tccgtgaaacaaagcggatgtaccggatNNttttattccggctatggggcaa
ttccccgtcgcggagcca
>d
atttatactcatgaaaatcttattcgagttNcattcaagacaagcttgaca
ttgatctacagaccaacagtacttacaaagaATGCCGaaatttaaaatgtggtcac

There are the following [ATGCatgcNN] bases in this file. We want to count the number of each character ignoring cases. This example will use a customized FastaInputFormat to split the file and use a FastaRecordReader to process each split. The mapper class emitts every single character with count 1 for each character in each fasta record, such as <a,1>, <T,1>,<G,1>,<g,1>. We use 3 reducers, and the partitioner class will partition the output of mapper into 3 reducers:[ATat] to reducer 0, [GCgc] to reducer 1, and others to reducer 2. We use a custom sortComparitor to sort the key-value pairs before they are passed to reducers. The custom comparitor sorts the keys ignoring cases so that the same DNA base with different cases will be next to each other. By default, setSortComparatorClass and setGroupingComparatorClass use the same comparitor, e.g, if we set setSortComparatorClass and leave setGroupingComparatorClass unsetted, setGroupingComparatorClass will use the same comparitor as we set for setSortComparatorClass, but but vice versa is not true, and you can use this example to test it.

pasted below is the code for each file:

FastaInputFormat.java
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.LineReader;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
/**
* NLineInputFormat which splits N lines of input as one split.
*
* In many “pleasantly” parallel applications, each process/mapper
* processes the same input file (s), but with computations are
* controlled by different parameters.(Referred to as “parameter sweeps”).
* One way to achieve this, is to specify a set of parameters
* (one set per line) as input in a control file
* (which is the input path to the map-reduce application,
* where as the input dataset is specified
* via a config variable in JobConf.).
*
* The NLineInputFormat can be used in such applications, that splits
* the input file such that by default, one line is fed as
* a value to one map task, and key is the offset.
* i.e. (k,v) is (LongWritable, Text).
* The location hints will span the whole mapred cluster.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class FastaInputFormat extends FileInputFormat<LongWritable, Text> {
public static final String LINES_PER_MAP =
“mapreduce.input.lineinputformat.linespermap”;

public RecordReader<LongWritable, Text> createRecordReader(
InputSplit genericSplit, TaskAttemptContext context)
throws IOException {
context.setStatus(genericSplit.toString());
return new FastaRecordReader();
}

/**
* Logically splits the set of input files for the job, splits N lines
* of the input as one split.
*
* @see FileInputFormat#getSplits(JobContext)
*/
public List<InputSplit> getSplits(JobContext job)
throws IOException {
List<InputSplit> splits = new ArrayList<InputSplit>();
int numLinesPerSplit = getNumLinesPerSplit(job);
for (FileStatus status : listStatus(job)) {
splits.addAll(getSplitsForFile(status,
job.getConfiguration(), numLinesPerSplit));
}
return splits;
}

public static List<FileSplit> getSplitsForFile(FileStatus status,
Configuration conf, int numLinesPerSplit) throws IOException {
List<FileSplit> splits = new ArrayList<FileSplit> ();
Path fileName = status.getPath();
if (status.isDir()) {
throw new IOException(“Not a file: ” + fileName);
}
FileSystem fs = fileName.getFileSystem(conf);
LineReader lr = null;
try {
FSDataInputStream in = fs.open(fileName);
lr = new LineReader(in, conf);
Text line = new Text();
int numLines = 0;
long begin = 0;
long length = 0;
int num = -1;
/**
while ((num = lr.readLine(line)) > 0) {
numLines++;
length += num;
if (numLines == numLinesPerSplit) {
if (begin == 0) {
splits.add(new FileSplit(fileName, begin, length – 1,
new String[] {}));
} else {
splits.add(new FileSplit(fileName, begin – 1, length,
new String[] {}));
}
begin += length;
length = 0;
numLines = 0;
}
}
if (numLines != 0) {
splits.add(new FileSplit(fileName, begin, length, new String[]{}));
}
} finally {
if (lr != null) {
lr.close();
}
}
return splits;
*/
long record_length = 0;
int recordsRead = 0;
while ((num = lr.readLine(line)) > 0) {
if (line.toString().indexOf(“>”) >= 0){
recordsRead++;
}
if (recordsRead > numLinesPerSplit){
splits.add(new FileSplit(fileName, begin, record_length, new String[]{}));
begin = length;
record_length = 0;
recordsRead = 1;
}

length += num;
record_length += num;

}
splits.add(new FileSplit(fileName, begin, record_length, new String[]{}));

} finally {
if (lr != null) {
lr.close();
}
}
return splits;
}

/**
* Set the number of lines per split
* @param job the job to modify
* @param numLines the number of lines per split
*/
public static void setNumLinesPerSplit(Job job, int numLines) {
job.getConfiguration().setInt(LINES_PER_MAP, numLines);
}

/**
* Get the number of lines per split
* @param job the job
* @return the number of lines per split
*/
public static int getNumLinesPerSplit(JobContext job) {
return job.getConfiguration().getInt(LINES_PER_MAP, 1);
}
}

FastaRecordReader.java
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.LineReader;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/**
* Treats keys as offset in file and value as line.
*/
public class FastaRecordReader extends RecordReader<LongWritable, Text> {
private static final Log LOG = LogFactory.getLog(FastaRecordReader.class);

private CompressionCodecFactory compressionCodecs = null;
private long start;
private long pos;
private long end;
private LineReader in;
private int maxLineLength;
private LongWritable key = null;
private Text value = null;

FSDataInputStream fileIn;
Configuration job;

public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
job = context.getConfiguration();
this.maxLineLength = job.getInt(“mapred.linerecordreader.maxlength”,
Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
compressionCodecs = new CompressionCodecFactory(job);
final CompressionCodec codec = compressionCodecs.getCodec(file);

// open the file and seek to the start of the split
FileSystem fs = file.getFileSystem(job);
fileIn = fs.open(split.getPath());
boolean skipFirstLine = false;
if (codec != null) {
in = new LineReader(codec.createInputStream(fileIn), job);
end = Long.MAX_VALUE;
} else {
if (start != 0) {
skipFirstLine = true;
–start;
fileIn.seek(start);
}
in = new LineReader(fileIn, job);
}
if (skipFirstLine) { // skip first line and re-establish “start”.
start += in.readLine(new Text(), 0,
(int)Math.min((long)Integer.MAX_VALUE, end – start));
}
this.pos = start;
}

public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new LongWritable();
}
key.set(pos);
if (value == null) {
value = new Text();
}
int newSize = 0;

/**
while (pos < end) {
newSize = in.readLine(value, maxLineLength,
Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
maxLineLength));
if (newSize == 0) {
break;
}
pos += newSize;
if (newSize < maxLineLength) {
break;
}

// line too long. try again
LOG.info(“Skipped line of size ” + newSize + ” at pos ” +
(pos – newSize));
}
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
*/
LOG.info(“##########################”);
StringBuilder text = new StringBuilder();
int record_length = 0;
Text line = new Text();
int recordsRead = 0;
while (pos < end) {
key.set(pos);
newSize = in.readLine(line, maxLineLength,Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),maxLineLength));

if(line.toString().indexOf(“>”) >= 0){
if(recordsRead > 9){//10 fasta records each time
value.set(text.toString());
fileIn.seek(pos);
in = new LineReader(fileIn, job);
return true;
}
recordsRead++;
}

record_length += newSize;
text.append(line.toString());
text.append(“\n”);
pos += newSize;

if (newSize == 0) {
break;
}
}
if (record_length == 0){
return false;
}
value.set(text.toString());
return true;

}

@Override
public LongWritable getCurrentKey() {
return key;
}

@Override
public Text getCurrentValue() {
return value;
}

/**
* Get the progress within the split
*/
public float getProgress() {
if (start == end) {
return 0.0f;
} else {
return Math.min(1.0f, (pos – start) / (float)(end – start));
}
}

public synchronized void close() throws IOException {
if (in != null) {
in.close();
}
}
}

CountBaseMapper.java
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;

public class CountBaseMapper
extends Mapper<Object, Text, Text, IntWritable>
{

private Text base = new Text();
private IntWritable one = new IntWritable(1);
private static final Log LOG = LogFactory.getLog(CountBaseMapper.class);

public void map(Object key,
Text value,
Context context)
throws IOException, InterruptedException
{
//System.err.println(String.format(“[map] key: (%s), value: (%s)”, key, value));
// break each sentence into words, using the punctuation characters shown
String fasta = value.toString();
String[] lines = fasta.split(“[\\r\\n]+”);
LOG.info(“#############”);
StringBuffer sb = new StringBuffer();
for(int j=1;j<lines.length;j++){
char[] array = lines[j].toCharArray();
for(char c : array){
LOG.info(“>”+new Character(c).toString()+”<“);
base.set(new Character(c).toString());
context.write(base,one);
}
}
/**
StringTokenizer tokenizer = new StringTokenizer(value.toString(), ” \t\n\r\f,.:;?![]'”);
while (tokenizer.hasMoreTokens())
{
// make the words lowercase so words like “an” and “An” are counted as one word
String s = tokenizer.nextToken().toLowerCase().trim();
System.err.println(String.format(“[map, in loop] token: (%s)”, s));
word.set(s);
context.write(word, one);
}
*/
}
}

CountBaseReducer.java
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;

public class CountBaseReducer
extends Reducer<Text, IntWritable, Text, IntWritable> //the types must corespond to the output of map the output of reduce
{
//private IntWritable occurrencesOfWord = new IntWritable();
private static final Log LOG = LogFactory.getLog(CountBaseReducer.class);

public void reduce(Text key,
Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException
{
LOG.info(“————–reducer—————-“);
int count = 0;
IntWritable out = new IntWritable();
for(IntWritable val:values){
count++;
}
out.set(count);
LOG.info(“<“+key.toString()+”>”);
//key.set(“>”+key.toString()+”<“);
context.write(key,out);

}
}

BasePartitioner.java
import org.apache.hadoop.mapreduce.Partitioner;

/** Partition keys by bases{A,T,G,C,a,t,g,c}. */
public class BasePartitioner<K, V> extends Partitioner<K, V> {

public int getPartition(K key, V value,
int numReduceTasks) {
String base = key.toString();
if(base.compareToIgnoreCase(“A”) == 0){
return 0;
}else if(base.compareToIgnoreCase(“C”) == 0){
return 1;
}else if(base.compareToIgnoreCase(“G”) == 0){
return 1;
}else if(base.compareToIgnoreCase(“T”) == 0){
return 0;
}else{
return 2;
}
}

}

BaseComparator.java
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
public class BaseComparator extends WritableComparator {
protected BaseComparator(){
super(Text.class,true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
Text t1 = (Text) w1;
Text t2 = (Text) w2;
//compare bases ignoring cases
String s1 = t1.toString().toUpperCase();
String s2 = t2.toString().toUpperCase();
int cmp = s1.compareTo(s2);
return cmp;
}
}

CountBaseDriver.java
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
public class CountBaseDriver {

/**
* the “driver” class. it sets everything up, then gets it started.
*/
public static void main(String[] args)
throws Exception
{
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2)
{
System.err.println(“Usage: blastdriver <inputFile> <outputDir>”);
System.exit(2);
}
Job job = new Job(conf, “count bases”);
job.setJarByClass(CountBaseMapper.class);
job.setMapperClass(CountBaseMapper.class);
job.setNumReduceTasks(3);
//job.setCombinerClass(CountBaseReducer.class);
job.setReducerClass(CountBaseReducer.class);
job.setInputFormatClass(FastaInputFormat.class);
job.setPartitionerClass(BasePartitioner.class);
job.setSortComparatorClass(BaseComparator.class); //setGroupingComparatorClass will use the same comparitor as setSortComparatorClass by default, so do not need to explicitly set setGroupingComparatorClass, but vice versa is not true. You can change the settings to test it.
//job.setGroupingComparatorClass(BaseComparator.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

Image

compile the course code files (make sure all th erequired jars included in the classpath):
javac -classpath .:/home/ubuntu/hadoop-1.0.1/hadoop-core-1.0.1.jar:/home/ubuntu/hadoop-1.0.1/commons-logging-1.1.1/commons-logging-1.1.1.jar:/home/ubuntu/hadoop-1.0.1/lib/commons-cli-1.2.jar:/home/ubuntu/hadoop-1.0.1/contrib/streaming/hadoop-streaming-1.0.1.jar *java

create the jar:
jar cef CountBaseDriver countbasedriver.jar .

put your input fasta files in input and run hadoop:
bin/hadoop fs -mkdir input
bin/hadoop fs -put input/test_fasta.fasta input
bin/hadoop jar countbasedriver.jar input output

results:

Image
without customized SortComparatorClass

Image
with customized SortComparatorClass

Download the source code jar file

Read Full Post »

core-site.xml
<?xml version=”1.0″?>
<?xml-stylesheet type=”text/xsl” href=”configuration.xsl”?>

<!– Put site-specific property overrides in this file. –>
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>

hdfs-site.xml
<?xml version=”1.0″?>
<?xml-stylesheet type=”text/xsl” href=”configuration.xsl”?>

<!– Put site-specific property overrides in this file. –>
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<!– dfs.name.dir and dfs.data.dir cannot be in the same directory–>
<name>dfs.name.dir</name>
<value>/mnt/name</value>
</property>
<property>
<name>dfs.data.dir</name>
<value>/mnt/data</value>
</property>
</configuration>

mapred-site.xml
<?xml version=”1.0″?>
<?xml-stylesheet type=”text/xsl” href=”configuration.xsl”?>

<!– Put site-specific property overrides in this file. –>
<configuration>
<property>
<name>mapred.job.tracker</name>
<value>localhost:9001</value>
</property>
<property>
<name>mapred.local.dir</name>
<value>/mnt</value>
</property>
</configuration>

masters list the host of Secondary Name Node
masters
localhost

hadoop-env.sh
# Set Hadoop-specific environment variables here.

# The only required environment variable is JAVA_HOME. All others are
# optional. When running a distributed configuration it is best to
# set JAVA_HOME in this file, so that it is correctly defined on
# remote nodes.

# The java implementation to use. Required.
export JAVA_HOME=/home/ubuntu/jdk1.7.0_03

# Extra Java CLASSPATH elements. Optional.
# export HADOOP_CLASSPATH=

# The maximum amount of heap to use, in MB. Default is 1000.
# export HADOOP_HEAPSIZE=2000

# Extra Java runtime options. Empty by default.
# export HADOOP_OPTS=-server

# Command specific options appended to HADOOP_OPTS when specified
export HADOOP_NAMENODE_OPTS=”-Dcom.sun.management.jmxremote $HADOOP_NAMENODE_OPTS”
export HADOOP_SECONDARYNAMENODE_OPTS=”-Dcom.sun.management.jmxremote $HADOOP_SECONDARYNAMENODE_OPTS”
export HADOOP_DATANODE_OPTS=”-Dcom.sun.management.jmxremote $HADOOP_DATANODE_OPTS”
export HADOOP_BALANCER_OPTS=”-Dcom.sun.management.jmxremote $HADOOP_BALANCER_OPTS”
export HADOOP_JOBTRACKER_OPTS=”-Dcom.sun.management.jmxremote $HADOOP_JOBTRACKER_OPTS”
# export HADOOP_TASKTRACKER_OPTS=
# The following applies to multiple commands (fs, dfs, fsck, distcp etc)
# export HADOOP_CLIENT_OPTS

# Extra ssh options. Empty by default.
# export HADOOP_SSH_OPTS=”-o ConnectTimeout=1 -o SendEnv=HADOOP_CONF_DIR”

# Where log files are stored. $HADOOP_HOME/logs by default.
export HADOOP_LOG_DIR=/mnt/logs

# File naming remote slave hosts. $HADOOP_HOME/conf/slaves by default.
# export HADOOP_SLAVES=${HADOOP_HOME}/conf/slaves

# host:path where hadoop code should be rsync’d from. Unset by default.
# export HADOOP_MASTER=master:/home/$USER/src/hadoop

# Seconds to sleep between slave commands. Unset by default. This
# can be useful in large clusters, where, e.g., slave rsyncs can
# otherwise arrive faster than the master can service them.
# export HADOOP_SLAVE_SLEEP=0.1

# The directory where pid files are stored. /tmp by default.
export HADOOP_PID_DIR=/mnt/pids

# A string representing this instance of hadoop. $USER by default.
# export HADOOP_IDENT_STRING=$USER

# The scheduling priority for daemon processes. See ‘man nice’.
# export HADOOP_NICENESS=10

Setup passphraseless
$ ssh-keygen -t dsa -P ” -f ~/.ssh/id_dsa
$ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys

Format a new distributed-filesystem:
$ bin/hadoop namenode -format

Start the hadoop daemons:
$ bin/start-all.sh

Check if hadoop is running correctly
ps U ubuntu(or your user name)
Image
If the five java processes exist as shown in the above figure, usually Hadoop is working well. (In some cases, the five java processes exist, but hadoop is not working. In this case, check the logs for each process to find out which process has a problem)

Read Full Post »

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* “License”); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an “AS IS” BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

//package org.apache.hadoop.mapreduce.lib.input;
package org.apache.hadoop.streaming;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.LineReader;

@InterfaceAudience.Public
@InterfaceStability.Stable
public class FastaInputFormat extends FileInputFormat<LongWritable, Text> {
public static final String LINES_PER_MAP =
“mapreduce.input.lineinputformat.linespermap”;

public RecordReader<LongWritable, Text> createRecordReader(
InputSplit genericSplit, TaskAttemptContext context)
throws IOException {
context.setStatus(genericSplit.toString());
return new FastaRecordReader();
}

/**
* Logically splits the set of input files for the job, splits N fasta sequences
* of the input as one split.
*
* @see FileInputFormat#getSplits(JobContext)
*/
public List<InputSplit> getSplits(JobContext job)
throws IOException {
List<InputSplit> splits = new ArrayList<InputSplit>();
int numLinesPerSplit = getNumLinesPerSplit(job);
for (FileStatus status : listStatus(job)) {
splits.addAll(getSplitsForFile(status,
job.getConfiguration(), numLinesPerSplit));
}
return splits;
}

public static List<FileSplit> getSplitsForFile(FileStatus status,
Configuration conf, int numLinesPerSplit) throws IOException {
List<FileSplit> splits = new ArrayList<FileSplit> ();
Path fileName = status.getPath();
if (status.isDir()) {
throw new IOException(“Not a file: ” + fileName);
}
FileSystem fs = fileName.getFileSystem(conf);
LineReader lr = null;
try {
FSDataInputStream in = fs.open(fileName);
lr = new LineReader(in, conf);
Text line = new Text();
int numLines = 0;
long begin = 0;
long length = 0;
int num = -1;
/**
while ((num = lr.readLine(line)) > 0) {
numLines++;
length += num;
if (numLines == numLinesPerSplit) {
// NLineInputFormat uses LineRecordReader, which always reads
// (and consumes) at least one character out of its upper split
// boundary. So to make sure that each mapper gets N lines, we
// move back the upper split limits of each split
// by one character here.
if (begin == 0) {
splits.add(new FileSplit(fileName, begin, length – 1,
new String[] {}));
} else {
splits.add(new FileSplit(fileName, begin – 1, length,
new String[] {}));
}
begin += length;
length = 0;
numLines = 0;
}
}
if (numLines != 0) {
splits.add(new FileSplit(fileName, begin, length, new String[]{}));
}
} finally {
if (lr != null) {
lr.close();
}
}
return splits;
*/
long record_length = 0;
int recordsRead = 0;
while ((num = lr.readLine(line)) > 0) {
if (line.toString().indexOf(“>”) >= 0){
recordsRead++;
}
if (recordsRead > numLinesPerSplit){
splits.add(new FileSplit(fileName, begin, record_length, new String[]{}));
begin = length;
record_length = 0;
recordsRead = 1;
}

length += num;
record_length += num;

}
splits.add(new FileSplit(fileName, begin, record_length, new String[]{}));

} finally {
if (lr != null) {
lr.close();
}
}
//LOG.info(splits.size() + “map tasks”);
return splits;
}

/**
* Set the number of lines per split
* @param job the job to modify
* @param numLines the number of lines per split
*/
public static void setNumLinesPerSplit(Job job, int numLines) {
job.getConfiguration().setInt(LINES_PER_MAP, numLines);
}

/**
* Get the number of lines per split
* @param job the job
* @return the number of lines per split
*/
public static int getNumLinesPerSplit(JobContext job) {
return job.getConfiguration().getInt(LINES_PER_MAP, 1);
}
}

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* “License”); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an “AS IS” BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

//package org.apache.hadoop.mapreduce.lib.input;
package org.apache.hadoop.streaming;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.LineReader;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;

/**
* Treats keys as offset in file and value as line.
*/
public class FastaRecordReader extends RecordReader<LongWritable, Text> {
private static final Log LOG = LogFactory.getLog(LineRecordReader.class);

private CompressionCodecFactory compressionCodecs = null;
private long start;
private long pos;
private long end;
private LineReader in;
private int maxLineLength;
private LongWritable key = null;
private Text value = null;

public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt(“mapred.linerecordreader.maxlength”,
Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
compressionCodecs = new CompressionCodecFactory(job);
final CompressionCodec codec = compressionCodecs.getCodec(file);

// open the file and seek to the start of the split
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
boolean skipFirstLine = false;
if (codec != null) {
in = new LineReader(codec.createInputStream(fileIn), job);
end = Long.MAX_VALUE;
} else {
if (start != 0) {
skipFirstLine = true;
–start;
fileIn.seek(start);
}
in = new LineReader(fileIn, job);
}
if (skipFirstLine) { // skip first line and re-establish “start”.
start += in.readLine(new Text(), 0,
(int)Math.min((long)Integer.MAX_VALUE, end – start));
}
this.pos = start;
}

public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new LongWritable();
}
key.set(pos);
if (value == null) {
value = new Text();
}
int newSize = 0;
/**
while (pos < end) {
newSize = in.readLine(value, maxLineLength,
Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
maxLineLength));
if (newSize == 0) {
break;
}
pos += newSize;
if (newSize < maxLineLength) {
break;
}

// line too long. try again
LOG.info(“Skipped line of size ” + newSize + ” at pos ” +
(pos – newSize));
}
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
*/
while (pos < end) {
key.set(pos);

int newSize = in.readLine(value, maxLineLength,
Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
maxLineLength));
text = value.toString();
if (text.lastIndexOf(“>”) > 0){
break;
}else{
pos += newSize;
}

if (newSize == 0) {
break;
}
// line too long. try again
LOG.info(“Skipped line of size ” + newSize + ” at pos ” + (pos – newSize));
}
return true;
//return false;

}

@Override
public LongWritable getCurrentKey() {
return key;
}

@Override
public Text getCurrentValue() {
return value;
}

/**
* Get the progress within the split
*/
public float getProgress() {
if (start == end) {
return 0.0f;
} else {
return Math.min(1.0f, (pos – start) / (float)(end – start));
}
}

public synchronized void close() throws IOException {
if (in != null) {
in.close();
}
}
}

Read Full Post »

Older Posts »