-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhadoopScript.py
89 lines (66 loc) · 2.2 KB
/
hadoopScript.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#!/usr/bin/env python
#reduce2.py
import string
import sys
import os
import subprocess
# counter will be number of grey nodes
# if number of grey nodes is less than 0:
#
counter = 1
firstCommand = True
firstCommand = "hadoop jar \
/home/instructor/hadoop-streaming/hadoop-streaming-2.7.3.jar \
-D stream.map.output.field.separator=. \
-D stream.num.map.output.key.fields=2 \
-files q2mapper2.py,q2reducer2.py \
-mapper q2mapper2.py -reducer q2reducer2.py \
-input /user/mbechirn/a2q2input/q2input.txt \
-output /user/mbechirn/a2q2.1"
inputFile = "/user/mbechirn/a2q2input/q2input.txt"
outputPath = "/user/mbechirn/a2q2.1/"
outputP = "/user/mbechirn/a2q2.1"
outputFile = "part*"
numO = 1
inputPath = "/user/mbechirn/"
constantCommand = "hadoop jar \
/home/instructor/hadoop-streaming/hadoop-streaming-2.7.3.jar \
-D stream.map.output.field.separator=. \
-D stream.num.map.output.key.fields=2 \
-files q2mapper2.py,q2reducer2.py \
-mapper q2mapper2.py -reducer q2reducer2.py \\"
# sub process library - can fork commands , get a handle and read stderr
# call first map reduce job
sp = subprocess.Popen([firstCommand], stdout=subprocess.PIPE, stderr =subprocess.PIPE, shell=True)
out, err = sp.communicate()
countG = 1
while(countG > 0):
countG = 0
if err == None:
system.exit()
print(err)
count = err.split("numgreynodes=")[1].split("\n")[0].rstrip()
countG = int(count)
inputFile = outputFile
filesWanted = "hdfs dfs -ls " + outputPath + "part*"
sp = subprocess.Popen([filesWanted], shell=True, stdout=subprocess.PIPE)
out = sp.communicate()
out = list(out)
print(out)
out2 = out[0].split(" /")[1]
out3 = out2.split(" ")[0]
inputPre = "-input " + outputPath
num = 0
numstr = "0000"
inputString = ""
print(out)
inputCurr = ""
for i in range(len(out)/2):
inputCurr = inputPre + "part-" + numstr + str(num)+ " "
inputString = inputString + inputCurr
numO +=1
outputPath = outputP + str(numO)
newCommand = constantCommand + inputString + "-output " + outputPath
outputPath = outputPath + "/"
sp = subprocess.Popen([newCommand], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
out, err = sp.communicate()