-
Notifications
You must be signed in to change notification settings - Fork 0
/
JobGenerator.py
68 lines (53 loc) · 2.37 KB
/
JobGenerator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
'''
Created on Jan 10, 2015
@author: niuzhaojie
'''
from Job import Job
from ComputeTask import ComputeTask
from Priority import Priority
from IOTask import IOTask
from FileBlock import FileBlock
from BlockType import BlockType
from Resource import Resource
class JobGenerator(object):
'''
classdocs
'''
@staticmethod
def genComputeIntensitveJob(jobID, numOfTask, memory, cpu, disk, network, execTime, submissionTime):
job = Job(jobID, submissionTime)
for i in range(numOfTask):
resource = Resource(memory, cpu, disk, network)
job.addTask(ComputeTask(jobID + "-computeTask-" + str(i), Priority.NORMAL, resource, execTime))
return job
@staticmethod
def genMapOnlyJob(jobID, inputFile, memory, cpu, disk, network, submissionTime):
job = Job(jobID, submissionTime)
blockIndex = 0
for block in inputFile.getBlockList():
resource = Resource(memory, cpu, disk, network)
job.addTask(IOTask(jobID + "-mapTask-" + str(blockIndex), Priority.NORMAL, resource, block))
blockIndex += 1
return job
@staticmethod
def genMapReduceJob(jobID, inputFile, mapMemory, mapCPU, mapDisk, mapNetwork, numOfReduce, redMemory, redCPU, redDisk, redNetwork, submissionTime):
mapTaskList = []
reduceTaskList = []
mapIndex = 0
for block in inputFile.getBlockList():
mapResRequest = Resource(mapMemory, mapCPU, mapDisk, mapNetwork)
mapTaskList.append(IOTask(jobID + "-mapTask-" + str(mapIndex), Priority.NORMAL, mapResRequest, block))
mapIndex += 1
for i in range(numOfReduce):
reduceResRequest = Resource(redMemory, redCPU, redDisk, redNetwork)
reduceTask = IOTask(jobID + "-redTask-" + str(i), Priority.HIGH, reduceResRequest,
FileBlock(jobID + "tmp" + str(i), inputFile.getFileSize() / float(numOfReduce), BlockType.INTERMEDIATE))
for mapTask in mapTaskList:
mapTask.addChild(reduceTask)
reduceTaskList.append(reduceTask)
job = Job(jobID, submissionTime)
for mapTask in mapTaskList:
job.addTask(mapTask)
for reduceTask in reduceTaskList:
job.addTask(reduceTask)
return job