Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FileService as a standalone microservice, LakeFS+S3 as dataset storage #3296

Open
wants to merge 47 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
72b39b1
add initial lake fs based implementation
bobbai00 Jan 15, 2025
47fe1ab
move lakefs logic to workflow core
bobbai00 Jan 16, 2025
013cc56
add uri related and lake fs document
bobbai00 Jan 16, 2025
9bcb8e4
fix bugs
bobbai00 Jan 16, 2025
df0b5de
a compilable version
bobbai00 Feb 16, 2025
9242a5a
a runnable version
bobbai00 Feb 16, 2025
59b46a1
finish jwt auth
bobbai00 Feb 21, 2025
d89e9c3
make the backend work
bobbai00 Feb 22, 2025
f180f0c
keep refactoring the dataset resource
bobbai00 Feb 23, 2025
c329e3f
succinct the config parsing
bobbai00 Feb 23, 2025
f45b602
test more APIs and closing to finish
bobbai00 Feb 24, 2025
1ae1c13
fix dataset creation and version creation
bobbai00 Feb 25, 2025
2ed989d
fix the presigned get
bobbai00 Feb 25, 2025
c31cbcc
closing to finish the upload
bobbai00 Feb 25, 2025
02a4057
refactor dataset frontend
bobbai00 Feb 26, 2025
5367d9a
finish upload
bobbai00 Feb 26, 2025
7d702f2
closing to finish the gui
bobbai00 Feb 26, 2025
a71c2a3
delete the lakefs test as the test environment don't have it
bobbai00 Feb 27, 2025
cc1718d
keep improving the backend and frontend
bobbai00 Feb 27, 2025
8010bac
make the workflow be able to read from the dataset
bobbai00 Feb 27, 2025
f3c4b35
adding python side dataset reader
bobbai00 Feb 28, 2025
94f000d
keep improving the frontend
bobbai00 Feb 28, 2025
893df71
clean up the frontend
bobbai00 Feb 28, 2025
559b954
finish the export
bobbai00 Mar 1, 2025
e050070
finalize the sharing feature
bobbai00 Mar 2, 2025
a2f39e4
fix the delete
bobbai00 Mar 2, 2025
9daac7d
recover the frontend change
bobbai00 Mar 3, 2025
049e9a9
fix test
bobbai00 Mar 3, 2025
3df767a
fix backend dependency and fix frontend
bobbai00 Mar 3, 2025
07cb8cc
cleanup the storage config
bobbai00 Mar 3, 2025
685f034
add more comments
bobbai00 Mar 3, 2025
9caae5d
save the multipart chunk change on frontend
bobbai00 Mar 3, 2025
9471174
recover gui changes
bobbai00 Mar 3, 2025
6eef903
do the rebase
bobbai00 Mar 4, 2025
d96296b
add the flag for controlling whether to select files from dataset
bobbai00 Mar 4, 2025
5e6f9a8
add default values for lakeFS+S3
bobbai00 Mar 4, 2025
fd84702
fmt
bobbai00 Mar 4, 2025
86c9fcb
add file service to part of the scripts
bobbai00 Mar 4, 2025
2b277fb
resolve comments and fix the py udf document
bobbai00 Mar 6, 2025
fb07239
fmt python
bobbai00 Mar 6, 2025
6b4c960
fmt and fix the version of docker compose
bobbai00 Mar 6, 2025
f8685ab
try to fix the cors issue
bobbai00 Mar 7, 2025
2d9eca1
fmt py file
bobbai00 Mar 7, 2025
a422c1d
add header for put
bobbai00 Mar 7, 2025
e232423
fmt UDF
bobbai00 Mar 7, 2025
21a8969
keep refining
bobbai00 Mar 9, 2025
6accd5b
update the docker compose
bobbai00 Mar 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/amber/src/main/python/pytexera/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Iterator, Optional, Union

from pyamber import *
from .storage.dataset_file_document import DatasetFileDocument
from .udf.udf_operator import (
UDFOperatorV2,
UDFTableOperator,
Expand All @@ -22,6 +23,7 @@
"UDFTableOperator",
"UDFBatchOperator",
"UDFSourceOperator",
"DatasetFileDocument",
# export external tools to be used
"overrides",
"logger",
Expand Down
3 changes: 3 additions & 0 deletions core/amber/src/main/python/pytexera/storage/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .dataset_file_document import DatasetFileDocument

__all__ = ["DatasetFileDocument"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import os
import io
import requests
import urllib.parse


class DatasetFileDocument:
def __init__(self, file_path: str):
"""
Parses the file path into dataset metadata.

:param file_path:
Expected format - "/ownerEmail/datasetName/versionName/fileRelativePath"
Example: "/[email protected]/twitterDataset/v1/california/irvine/tw1.csv"
"""
parts = file_path.strip("/").split("/")
if len(parts) < 4:
raise ValueError(
"Invalid file path format. "
"Expected: /ownerEmail/datasetName/versionName/fileRelativePath"
)

self.owner_email = parts[0]
self.dataset_name = parts[1]
self.version_name = parts[2]
self.file_relative_path = "/".join(parts[3:])

self.jwt_token = os.getenv("USER_JWT_TOKEN")
self.presign_endpoint = os.getenv("PRESIGN_API_ENDPOINT")

if not self.jwt_token:
raise ValueError(
"JWT token is required but not set in environment variables."
)
if not self.presign_endpoint:
self.presign_endpoint = "http://localhost:9092/api/dataset/presign-download"

def get_presigned_url(self) -> str:
"""
Requests a presigned URL from the API.

:return: The presigned URL as a string.
:raises: RuntimeError if the request fails.
"""
headers = {"Authorization": f"Bearer {self.jwt_token}"}
encoded_file_path = urllib.parse.quote(
f"/{self.owner_email}"
f"/{self.dataset_name}"
f"/{self.version_name}"
f"/{self.file_relative_path}"
)

params = {"filePath": encoded_file_path}

response = requests.get(self.presign_endpoint, headers=headers, params=params)

if response.status_code != 200:
raise RuntimeError(
f"Failed to get presigned URL: "
f"{response.status_code} {response.text}"
)

return response.json().get("presignedUrl")

def read_file(self) -> io.BytesIO:
"""
Reads the file content from the presigned URL.

:return: A file-like object.
:raises: RuntimeError if the retrieval fails.
"""
presigned_url = self.get_presigned_url()
response = requests.get(presigned_url)

if response.status_code != 200:
raise RuntimeError(
f"Failed to retrieve file content: "
f"{response.status_code} {response.text}"
)

return io.BytesIO(response.content)
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package edu.uci.ics.texera.web

import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.github.dirkraft.dropwizard.fileassets.FileAssetsBundle
import com.typesafe.scalalogging.LazyLogging
Expand All @@ -17,14 +16,7 @@ import edu.uci.ics.texera.web.resource.dashboard.DashboardResource
import edu.uci.ics.texera.web.resource.dashboard.admin.execution.AdminExecutionResource
import edu.uci.ics.texera.web.resource.dashboard.admin.user.AdminUserResource
import edu.uci.ics.texera.web.resource.dashboard.hub.HubResource
import edu.uci.ics.texera.web.resource.dashboard.user.dataset.`type`.{
DatasetFileNode,
DatasetFileNodeSerializer
}
import edu.uci.ics.texera.web.resource.dashboard.user.dataset.{
DatasetAccessResource,
DatasetResource
}
import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetAccessResource
import edu.uci.ics.texera.web.resource.dashboard.user.project.{
ProjectAccessResource,
ProjectResource,
Expand Down Expand Up @@ -89,11 +81,6 @@ class TexeraWebApplication
bootstrap.addBundle(new WebsocketBundle(classOf[CollaborationResource]))
// register scala module to dropwizard default object mapper
bootstrap.getObjectMapper.registerModule(DefaultScalaModule)

// register a new custom module and add the custom serializer into it
val customSerializerModule = new SimpleModule("CustomSerializers")
customSerializerModule.addSerializer(classOf[DatasetFileNode], new DatasetFileNodeSerializer())
bootstrap.getObjectMapper.registerModule(customSerializerModule)
}

override def run(configuration: TexeraWebConfiguration, environment: Environment): Unit = {
Expand Down Expand Up @@ -146,7 +133,6 @@ class TexeraWebApplication
environment.jersey.register(classOf[ResultResource])
environment.jersey.register(classOf[HubResource])
environment.jersey.register(classOf[WorkflowVersionResource])
environment.jersey.register(classOf[DatasetResource])
environment.jersey.register(classOf[DatasetAccessResource])
environment.jersey.register(classOf[ProjectResource])
environment.jersey.register(classOf[ProjectAccessResource])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import edu.uci.ics.texera.dao.jooq.generated.tables.daos.{
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.{
Dataset,
DatasetUserAccess,
DatasetVersion,
User
DatasetVersion
}
import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetAccessResource._
import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetResource.{context, _}
Expand Down Expand Up @@ -200,28 +199,6 @@ object DatasetResource {
DatasetOperation(filesToAdd.toMap, filesToRemove.toList)
}

/**
* Create a new dataset version by adding new files
* @param did the target dataset id
* @param user the user submitting the request
* @param filesToAdd the map containing the files to add
* @return the created dataset version
*/
def createNewDatasetVersionByAddingFiles(
did: Integer,
user: User,
filesToAdd: Map[java.nio.file.Path, InputStream]
): Option[DashboardDatasetVersion] = {
applyDatasetOperationToCreateNewVersion(
context,
did,
user.getUid,
user.getEmail,
"",
DatasetOperation(filesToAdd, List())
)
}

// apply the dataset operation to create a new dataset version
// it returns the created dataset version if creation succeed, else return None
// concurrency control is performed here: the thread has to have the lock in order to create the new version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,17 @@ import edu.uci.ics.amber.core.storage.model.VirtualDocument
import edu.uci.ics.amber.core.tuple.Tuple
import edu.uci.ics.amber.core.virtualidentity.{OperatorIdentity, WorkflowIdentity}
import edu.uci.ics.amber.core.workflow.PortIdentity
import edu.uci.ics.amber.util.{ArrowUtils, PathUtils}
import edu.uci.ics.amber.util.ArrowUtils
import edu.uci.ics.texera.dao.jooq.generated.tables.pojos.User
import edu.uci.ics.texera.web.model.websocket.request.ResultExportRequest
import edu.uci.ics.texera.web.model.websocket.response.ResultExportResponse
import edu.uci.ics.texera.web.resource.dashboard.user.dataset.DatasetResource.createNewDatasetVersionByAddingFiles
import edu.uci.ics.texera.web.resource.dashboard.user.workflow.{
WorkflowExecutionsResource,
WorkflowVersionResource
}
import edu.uci.ics.texera.web.service.WorkflowExecutionService.getLatestExecutionId

import java.io.{FilterOutputStream, IOException, OutputStream, PipedInputStream, PipedOutputStream}
import java.io.{FilterOutputStream, IOException, OutputStream}
import java.nio.channels.Channels
import java.nio.charset.StandardCharsets
import java.time.LocalDateTime
Expand All @@ -33,6 +32,10 @@ import org.apache.arrow.vector.ipc.ArrowFileWriter
import org.apache.commons.lang3.StringUtils
import javax.ws.rs.WebApplicationException
import javax.ws.rs.core.StreamingOutput
import edu.uci.ics.texera.web.auth.JwtAuth
import edu.uci.ics.texera.web.auth.JwtAuth.{TOKEN_EXPIRE_TIME_IN_DAYS, dayToMin, jwtClaims}

import java.net.{HttpURLConnection, URL, URLEncoder}

/**
* A simple wrapper that ignores 'close()' calls on the underlying stream.
Expand All @@ -52,6 +55,14 @@ object ResultExportService {
// Matches the remote's approach for a thread pool
final private val pool: ThreadPoolExecutor =
Executors.newFixedThreadPool(3).asInstanceOf[ThreadPoolExecutor]

lazy val fileServiceUploadOneFileToDatasetEndpoint: String =
sys.env
.getOrElse(
"FILE_SERVICE_UPLOAD_ONE_FILE_TO_DATASET_ENDPOINT",
"http://localhost:9092/api/dataset/did/upload"
)
.trim
}

class ResultExportService(workflowIdentity: WorkflowIdentity) {
Expand Down Expand Up @@ -156,23 +167,22 @@ class ResultExportService(workflowIdentity: WorkflowIdentity) {
results: Iterable[Tuple],
headers: List[String]
): (Option[String], Option[String]) = {
val fileName = generateFileName(request, operatorId, "csv")
try {
val pipedOutputStream = new PipedOutputStream()
val pipedInputStream = new PipedInputStream(pipedOutputStream)

pool.submit(new Runnable {
override def run(): Unit = {
val writer = CSVWriter.open(pipedOutputStream)
saveToDatasets(
request,
user,
outputStream => {
val writer = CSVWriter.open(outputStream)
writer.writeRow(headers)
results.foreach { tuple =>
writer.writeRow(tuple.getFields.toIndexedSeq)
}
writer.close()
}
})

val fileName = generateFileName(request, operatorId, "csv")
saveToDatasets(request, user, pipedInputStream, fileName)
},
fileName
)
(Some(s"CSV export done for operator $operatorId -> file: $fileName"), None)
} catch {
case ex: Exception =>
Expand Down Expand Up @@ -202,17 +212,15 @@ class ResultExportService(workflowIdentity: WorkflowIdentity) {
val field = selectedRow.getField(columnIndex)
val dataBytes: Array[Byte] = convertFieldToBytes(field)

val pipedOutputStream = new PipedOutputStream()
val pipedInputStream = new PipedInputStream(pipedOutputStream)

pool.submit(new Runnable {
override def run(): Unit = {
pipedOutputStream.write(dataBytes)
pipedOutputStream.close()
}
})

saveToDatasets(request, user, pipedInputStream, fileName)
saveToDatasets(
request,
user,
outputStream => {
outputStream.write(dataBytes)
outputStream.close()
},
fileName
)
(Some(s"Data export done for operator $operatorId -> file: $fileName"), None)
} catch {
case ex: Exception =>
Expand Down Expand Up @@ -242,24 +250,24 @@ class ResultExportService(workflowIdentity: WorkflowIdentity) {
}

try {
val pipedOutputStream = new PipedOutputStream()
val pipedInputStream = new PipedInputStream(pipedOutputStream)
val allocator = new RootAllocator()

pool.submit(() => {
Using.Manager { use =>
val (writer, root) = createArrowWriter(results, allocator, pipedOutputStream)
use(writer)
use(root)
use(allocator)
use(pipedOutputStream)

writeArrowData(writer, root, results)
}
})

val fileName = generateFileName(request, operatorId, "arrow")
saveToDatasets(request, user, pipedInputStream, fileName)

saveToDatasets(
request,
user,
outputStream => {
val allocator = new RootAllocator()
Using.Manager { use =>
val (writer, root) = createArrowWriter(results, allocator, outputStream)
use(writer)
use(root)
use(allocator)

writeArrowData(writer, root, results)
}
},
fileName
)

(Some(s"Arrow file export done for operator $operatorId -> file: $fileName"), None)
} catch {
Expand Down Expand Up @@ -333,17 +341,47 @@ class ResultExportService(workflowIdentity: WorkflowIdentity) {
private def saveToDatasets(
request: ResultExportRequest,
user: User,
pipedInputStream: PipedInputStream,
fileWriter: OutputStream => Unit, // Pass function that writes data
fileName: String
): Unit = {
request.datasetIds.foreach { did =>
val datasetPath = PathUtils.getDatasetPath(did)
val filePath = datasetPath.resolve(fileName)
createNewDatasetVersionByAddingFiles(
did,
user,
Map(filePath -> pipedInputStream)
val encodedFilePath = URLEncoder.encode(fileName, StandardCharsets.UTF_8.name())
val message = URLEncoder.encode(
s"Export from workflow ${request.workflowName}",
StandardCharsets.UTF_8.name()
)

val uploadUrl = s"$fileServiceUploadOneFileToDatasetEndpoint"
.replace("did", did.toString) + s"?filePath=$encodedFilePath&message=$message"

var connection: HttpURLConnection = null
try {
val url = new URL(uploadUrl)
connection = url.openConnection().asInstanceOf[HttpURLConnection]
connection.setDoOutput(true)
connection.setRequestMethod("POST")
connection.setRequestProperty("Content-Type", "application/octet-stream")
connection.setRequestProperty(
"Authorization",
s"Bearer ${JwtAuth.jwtToken(jwtClaims(user, dayToMin(TOKEN_EXPIRE_TIME_IN_DAYS)))}"
)

// Get output stream from connection
val outputStream = connection.getOutputStream
fileWriter(outputStream) // Write directly to HTTP request output stream
outputStream.close()

// Check response
val responseCode = connection.getResponseCode
if (responseCode != HttpURLConnection.HTTP_OK) {
throw new RuntimeException(s"Failed to upload file. Server responded with: $responseCode")
}
} catch {
case e: Exception =>
throw new RuntimeException(s"Error uploading file to dataset $did: ${e.getMessage}", e)
} finally {
if (connection != null) connection.disconnect()
}
}
}

Expand Down
Loading
Loading