Skip to content
This repository has been archived by the owner on Oct 1, 2020. It is now read-only.

Commit

Permalink
Merge branch 'release/0.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
flahn committed Feb 6, 2018
2 parents 3c9cb56 + 0c8bb4a commit 3b3cfe1
Show file tree
Hide file tree
Showing 11 changed files with 444 additions and 259 deletions.
6 changes: 4 additions & 2 deletions DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: openEO.R.Backend
Title: Reference backend implementation for openEO conformant backend with a local filesystem
Version: 0.0.1
Version: 0.1
Authors@R: person("Florian", "Lahn", email = "[email protected]", role = c("aut", "cre"))
Description: The package contains a backend solution in compliance with the openEO API. In this demonstration
the file backend solution is the local file system containing some raster data collections.
Expand All @@ -10,7 +10,9 @@ Depends:
gdalUtils (>= 2.0.1.7),
raster (>= 2.6-7),
sodium (>= 1.1),
plumber (>= 0.4.4)
plumber (>= 0.4.4),
DBI (>= 0.7),
RSQLite (>= 2.0)
Encoding: UTF-8
LazyData: true
RoxygenNote: 6.0.1
Expand Down
2 changes: 2 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ export(User)
export(View)
export(isProcess)
export(isProduct)
export(migrateFilesToDB)
export(openeo.server)
import(DBI)
import(plumber)
import(raster)
importFrom(R6,R6Class)
Expand Down
1 change: 0 additions & 1 deletion R/Collection-class.R
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ Collection <- R6Class(
.collection.filterbytime = function (granules,from,to) {
minpos = -1
maxpos = -1

if (from > to) {
old <- to
to <- from
Expand Down
7 changes: 4 additions & 3 deletions R/ExecutableProcess-class.R
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ ExecutableProcess <- R6Class(
"ExecutableProcess",
inherit = Process,
public = list(
executable=FALSE,

initialize= function(process_id = NA,
description = NA,
Expand Down Expand Up @@ -54,8 +53,10 @@ ExecutableProcess <- R6Class(
detailedInfo = function() {
args = list()

args=lapply(self$args, function(argument) {argument$valueInfo()})

args=lapply(self$args, function(argument) {
# this needs to be the unlisted version since valueInfo is also used to get a simplified represenation of processes
argument$valueInfo()
})
return(list(
process_id = self$process_id,
args = args
Expand Down
101 changes: 85 additions & 16 deletions R/Job-class.R
Original file line number Diff line number Diff line change
Expand Up @@ -30,40 +30,99 @@ Job <- R6Class(
user_id=NULL,
consumed_credits=NULL,

filePath = NULL,

initialize = function(job_id=NULL,filePath=NULL,process_graph=NULL,user_id = NULL) {
initialize = function(job_id=NULL,process_graph=NULL,user_id = NULL) {
if (is.null(job_id)||missing(job_id)) {
stop("Cannot create new Job. There is no job_id specified")
} else {
self$job_id = job_id
}


if (is.null(filePath) || missing(filePath)) {
stop("Cannot initialize Job without the job workspace")
} else {
self$filePath = filePath
}

if (!is.null(user_id)) {
self$user_id = user_id
}

self$consumed_credits = 0

self$process_graph = process_graph
return(self)
},

store = function(json=NA) {
dir.create(self$filePath)
write(x=json,file=paste(self$filePath,"/process_graph.json",sep=""))
store = function(con) {
exists = dbGetQuery(con,"select count(*) from job where job_id = :id",param=list(id=self$job_id)) == 1

if (!exists) {
insertIntoQuery = "insert into job (job_id,
user_id,
status,
process_graph,
submitted,
last_update,
consumed_credits) values (
:job_id, :user_id, :status, :process_graph, :submitted, :last_update, :consumed_credits
);"
dbExecute(con, insertIntoQuery, param = list(
job_id = self$job_id,
user_id = self$user_id,
status = self$status,
process_graph = encodeProcessGraph(toJSON(self$process_graph,auto_unbox = TRUE,pretty=TRUE)),
submitted=as.character(self$submitted),
last_update = as.character(self$last_update),
consumed_credits = self$consumed_credits
))

} else {
updateQuery = "update job
set
user_id = :id,
status = :status,
process_graph = :process_graph,
submitted = :submitted,
last_update = :last_update,
consumed_credits = :consumed_credits
where job_id = :job_id;"

dbExecute(con, updateQuery, param = list(
user_id = self$user_id,
status = self$status,
process_graph = encodeProcessGraph(toJSON(self$process_graph$detailedInfo(),auto_unbox = TRUE,pretty=TRUE)),
submitted = as.character(self$submitted),
last_update = as.character(self$last_update),
consumed_credits = self$consumed_credits,
job_id = self$job_id
))
}

# dir.create(self$filePath)
# write(x=json,file=paste(self$filePath,"/process_graph.json",sep=""))
},

loadProcessGraph = function() {
parsedJson = fromJSON(paste(self$filePath,"/process_graph.json",sep=""))
self$process_graph = self$loadProcess(parsedJson[["process_graph"]])
if (!isProcess(self$process_graph)) {

if (is.character(self$process_graph)) {
parsedJson = fromJSON(self$process_graph, simplifyDataFrame = FALSE)
if (!"process_graph" %in% names(parsedJson)) {
parsedJson = list(process_graph=parsedJson)
}
} else if (is.list(self$process_graph)) {
if (!"process_graph" %in% names(self$process_graph)) {
parsedJson = list(process_graph=self$process_graph)
}
} else {
jsonText = dbGetQuery(openeo.server$database, "select process_graph from job where job_id = :id", param=list(id=self$job_id))[1,]
parsedJson = fromJSON(jsonText, simplifyDataFrame = FALSE)
if (!"process_graph" %in% names(parsedJson)) {
parsedJson = list(process_graph=self$process_graph)
}
}


self$process_graph = self$loadProcess(parsedJson[["process_graph"]])
} else {
# do nothing, we already have a process graph
}
},

loadProcess = function(parsedJson) {
processId = parsedJson[["process_id"]]
#TODO: add cases for udfs
Expand Down Expand Up @@ -93,6 +152,8 @@ Job <- R6Class(
last_update = self$last_update,
consumed_credits = self$consumed_credits
)

return(info)
},

run = function() {
Expand All @@ -105,6 +166,14 @@ Job <- R6Class(
)
)

encodeProcessGraph = function(text) {
return(bin2hex(charToRaw(text)))
}

decodeProcessGraph = function(hex) {
return(rawToChar(hex2bin(hex)))
}

isJob = function(obj) {
return("Job" %in% class(obj))
}
4 changes: 1 addition & 3 deletions R/Process-class.R
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ Process <- R6Class(
for (arg in self$args) {
arglist = append(arglist, arg$shortInfo())
}

res = list(
process_id = self$process_id,
description = self$description,
Expand All @@ -56,9 +55,8 @@ Process <- R6Class(
if (is.null(job)) {
stop("No job defined for this executable process")
}

#return a process where the arguments from the parsed json file are set for
#this "args". E.g. set a value for args[["from"]]$value and set Process$executable to TRUE
#this "args". E.g. set a value for args[["from"]]$value

# json at this point is the named list of the process graph provided by the json stored under jobs
args = json$args
Expand Down
Loading

0 comments on commit 3b3cfe1

Please sign in to comment.