Skip to content

Commit

Permalink
Merge pull request #109 from mlverse/updates
Browse files Browse the repository at this point in the history
Updates
  • Loading branch information
edgararuiz authored Apr 16, 2024
2 parents 93c022e + 9903190 commit 6691373
Show file tree
Hide file tree
Showing 32 changed files with 164 additions and 151 deletions.
8 changes: 3 additions & 5 deletions DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: pysparklyr
Title: Provides a 'PySpark' Back-End for the 'sparklyr' Package
Version: 0.1.3.9000
Version: 0.1.4
Authors@R: c(
person("Edgar", "Ruiz", , "[email protected]", role = c("aut", "cre")),
person(given = "Posit Software, PBC", role = c("cph", "fnd"))
Expand All @@ -11,7 +11,7 @@ Description: It enables 'sparklyr' to integrate with 'Spark Connect', and
License: MIT + file LICENSE
Encoding: UTF-8
Roxygen: list(markdown = TRUE)
RoxygenNote: 7.2.3
RoxygenNote: 7.3.1
Imports:
cli,
DBI,
Expand All @@ -22,7 +22,7 @@ Imports:
reticulate (>= 1.33),
methods,
rlang,
sparklyr (>= 1.8.4.9004),
sparklyr (>= 1.8.5),
tidyselect,
fs,
magrittr,
Expand All @@ -41,5 +41,3 @@ Suggests:
tibble,
withr
Config/testthat/edition: 3
Remotes:
sparklyr/sparklyr
7 changes: 7 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,19 @@ S3method(ml_logistic_regression,tbl_pyspark)
S3method(ml_pipeline,pyspark_connection)
S3method(ml_predict,ml_connect_model)
S3method(ml_save,ml_connect_pipeline_stage)
S3method(ml_title,ml_model_logistic_regression)
S3method(ml_transform,ml_connect_pipeline_model)
S3method(pivot_longer,tbl_pyspark)
S3method(print,ml_connect_estimator)
S3method(print,ml_connect_model)
S3method(print,ml_output_params)
S3method(print,spark_pyobj)
S3method(python_obj_get,default)
S3method(python_obj_get,ml_connect_estimator)
S3method(python_obj_get,ml_connect_model)
S3method(python_obj_get,ml_connect_pipeline_model)
S3method(python_obj_get,python.builtin.object)
S3method(python_obj_get,spark_pyobj)
S3method(same_src,pyspark_connection)
S3method(sample_frac,tbl_pyspark)
S3method(sample_n,tbl_pyspark)
Expand Down
19 changes: 17 additions & 2 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,23 @@
# pysparklyr dev
# pysparklyr 0.1.4

### New

* Adds support for `spark_apply()` via the `rpy2` Python library.
* Adds support for `spark_apply()` via the `rpy2` Python library
* It will not automatically distribute packages, it will assume that the
necessary packages are already installed in each node. This also means that
the `packages` argument is not supported
* As in its original implementation, schema inferring works, and as with the
original implementation, it has a performance cost. Unlike the original, the
Databricks, and Spark, Connect version will return a 'columns' specification
that you can use for the next time you run the call.

### Improvements

* At connection time, it enables Arrow by default. It does this by setting
these two configuration settings to true:
* `spark.sql.execution.arrow.pyspark.enabled`
* `spark.sql.execution.arrow.pyspark.fallback.enabled`


# pysparklyr 0.1.3

Expand Down
34 changes: 16 additions & 18 deletions R/databricks-utils.R
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
databricks_host <- function(host = NULL, fail = TRUE) {
if(!is.null(host)) {
if (!is.null(host)) {
return(set_names(host, "argument"))
}
env_host <- Sys.getenv("DATABRICKS_HOST", unset = NA)
env_host <- Sys.getenv("DATABRICKS_HOST", unset = NA)
connect_host <- Sys.getenv("CONNECT_DATABRICKS_HOST", unset = NA)
if(!is.na(env_host)) {
if (!is.na(env_host)) {
host <- set_names(env_host, "environment")
}
if(!is.na(connect_host)) {
if (!is.na(connect_host)) {
host <- set_names(connect_host, "environment_connect")
}
if (is.null(host)) {
Expand All @@ -27,7 +27,7 @@ databricks_host <- function(host = NULL, fail = TRUE) {
}

databricks_token <- function(token = NULL, fail = FALSE) {
if(!is.null(token)) {
if (!is.null(token)) {
return(set_names(token, "argument"))
}
# Checks the Environment Variable
Expand All @@ -37,7 +37,7 @@ databricks_token <- function(token = NULL, fail = FALSE) {
if (!is.na(env_token)) {
token <- set_names(env_token, "environment")
} else {
if(!is.na(connect_token)) {
if (!is.na(connect_token)) {
token <- set_names(connect_token, "environment_connect")
}
}
Expand Down Expand Up @@ -68,8 +68,7 @@ databricks_token <- function(token = NULL, fail = FALSE) {
databricks_dbr_version_name <- function(cluster_id,
host = NULL,
token = NULL,
silent = FALSE
) {
silent = FALSE) {
bullets <- NULL
version <- NULL
cluster_info <- databricks_dbr_info(
Expand Down Expand Up @@ -99,17 +98,16 @@ databricks_extract_version <- function(x) {
databricks_dbr_info <- function(cluster_id,
host = NULL,
token = NULL,
silent = FALSE
) {

silent = FALSE) {
cli_div(theme = cli_colors())

if(!silent) {
if (!silent) {
cli_progress_step(
msg = "Retrieving info for cluster:}{.emph '{cluster_id}'",
msg_done = "{.header Cluster:} {.emph '{cluster_id}'} | {.header DBR: }{.emph '{version}'}",
msg_failed = "Failed contacting:}{.emph '{cluster_id}'"
)}
msg = "Retrieving info for cluster:}{.emph '{cluster_id}'",
msg_done = "{.header Cluster:} {.emph '{cluster_id}'} | {.header DBR: }{.emph '{version}'}",
msg_failed = "Failed contacting:}{.emph '{cluster_id}'"
)
}

out <- databricks_cluster_get(cluster_id, host, token)
if (inherits(out, "try-error")) {
Expand Down Expand Up @@ -142,7 +140,7 @@ databricks_dbr_info <- function(cluster_id,
if (as.character(substr(out, 1, 26)) == "Error in req_perform(.) : ") {
out <- substr(out, 27, nchar(out))
}
if(!silent) cli_progress_done(result = "failed")
if (!silent) cli_progress_done(result = "failed")
cli_abort(
c(
"{.header Connection with Databricks failed: }\"{trimws(out)}\"",
Expand All @@ -156,7 +154,7 @@ databricks_dbr_info <- function(cluster_id,
} else {
version <- databricks_extract_version(out)
}
if(!silent) cli_progress_done()
if (!silent) cli_progress_done()
cli_end()
out
}
Expand Down
9 changes: 3 additions & 6 deletions R/deploy.R
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ deploy_databricks <- function(
token = NULL,
confirm = interactive(),
...) {

cli_div(theme = cli_colors())

cluster_id <- cluster_id %||% Sys.getenv("DATABRICKS_CLUSTER_ID")
Expand Down Expand Up @@ -207,13 +206,13 @@ deploy <- function(

req_file <- path(appDir, "requirements.txt")
prev_deployments <- deployments(appDir)
if(!file_exists(req_file) && nrow(prev_deployments) == 0 && check_interactive()) {
if (!file_exists(req_file) && nrow(prev_deployments) == 0 && check_interactive()) {
cli_inform(c(
"{.header Would you like to create the 'requirements.txt' file?}",
"{.class Why consider? This will allow you to skip using `version` or `cluster_id`}"
))
choice <- menu(choices = c("Yes", "No"))
if(choice == 1) {
if (choice == 1) {
requirements_write(
destfile = req_file,
python = python
Expand Down Expand Up @@ -270,8 +269,7 @@ deploy_find_environment <- function(
}
}
if (is.null(ret)) {

if(is.null(exe_py)) {
if (is.null(exe_py)) {
cli_abort("No Python environment could be found")
} else {
ret <- exe_py
Expand All @@ -282,4 +280,3 @@ deploy_find_environment <- function(
cli_bullets(c("i" = "{.header Python:} {ret}"))
ret
}

7 changes: 7 additions & 0 deletions R/dplyr.R
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,12 @@ python_obj_get <- function(x) {
UseMethod("python_obj_get")
}

#' @export
python_obj_get.ml_connect_model <- function(x) {
x$pipeline$pyspark_obj
}

#' @export
python_obj_get.default <- function(x) {
if (inherits(x, "character")) {
return(x)
Expand All @@ -207,22 +209,27 @@ python_obj_get.default <- function(x) {
}
}

#' @export
python_obj_get.python.builtin.object <- function(x) {
x
}

#' @export
python_obj_get.spark_pyobj <- function(x) {
x[["pyspark_obj"]]
}

#' @export
python_obj_get.ml_connect_model <- function(x) {
x[["pipeline"]][["pyspark_obj"]]
}

#' @export
python_obj_get.ml_connect_estimator <- function(x) {
x[[".jobj"]]
}

#' @export
python_obj_get.ml_connect_pipeline_model <- function(x) {
x[[".jobj"]]
}
Expand Down
4 changes: 2 additions & 2 deletions R/ide-snippet.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ connection_databricks_shinyapp <- function() {
if (!"shiny" %in% installed.packages()) {
cli_abort(
"The `shiny` package is not installed, please install and retry."
)
)
}
}

Expand All @@ -37,5 +37,5 @@ get_wrapper <- function(x, pos = -1, envir = as.environment(pos)) {
x = x,
pos = pos,
envir = envir
)
)
}
1 change: 1 addition & 0 deletions R/ml-connect-model.R
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ ml_title <- function(x) {
UseMethod("ml_title")
}

#' @export
ml_title.ml_model_logistic_regression <- function(x) {
"Logistic Regression"
}
Expand Down
6 changes: 3 additions & 3 deletions R/python-import-check.R
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import_check <- function(x, envname, silent = FALSE) {
cli_div(theme = cli_colors())
if(!silent) {
if (!silent) {
cli_progress_step(
msg = "Attempting to load {.emph '{envname}'}",
msg_done = "{.header Python environment:} {.emph '{envname}'}",
Expand Down Expand Up @@ -30,7 +30,7 @@ import_check <- function(x, envname, silent = FALSE) {
# If there is a Python environment already loaded
if (env_found) {
find_env <- env_python(envname)
if(is.na(find_env)) {
if (is.na(find_env)) {
find_env <- ""
}
if (find_env == py_exe()) {
Expand All @@ -44,7 +44,7 @@ import_check <- function(x, envname, silent = FALSE) {
if (env_found) {
# If the envname is found, we try to use it
envir_type <- env_type(envname)
if(is.na(envir_type)) {
if (is.na(envir_type)) {
envir_type <- ""
}
if (envir_type == "virtualenv") {
Expand Down
16 changes: 9 additions & 7 deletions R/python-install.R
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ install_environment <- function(
library_info <- python_library_info(main_library, version)

if (!is.null(library_info)) {
if(is.null(python_version)) {
if (is.null(python_version)) {
python_version <- library_info$requires_python
}
version <- library_info$version
Expand Down Expand Up @@ -244,12 +244,14 @@ install_environment <- function(
if (new_env && method != "conda" &&
is.null(virtualenv_starter(python_version))) {
cli_abort(c(
paste0("{.header Python version} {.emph '{python_number}'}",
" {.header or higher is required by some libraries.}"
),
" " = paste0("Use: {.run reticulate::install_python",
"(version = '{python_number}:latest')} to install."
)
paste0(
"{.header Python version} {.emph '{python_number}'}",
" {.header or higher is required by some libraries.}"
),
" " = paste0(
"Use: {.run reticulate::install_python",
"(version = '{python_number}:latest')} to install."
)
))
}

Expand Down
14 changes: 6 additions & 8 deletions R/python-use-envname.R
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ use_envname <- function(
match_first = FALSE,
ignore_reticulate_python = FALSE,
ask_if_not_installed = interactive(),
main_library = NULL
) {

main_library = NULL) {
cli_div(theme = cli_colors())

ret_python <- reticulate_python_check(ignore_reticulate_python)
Expand Down Expand Up @@ -37,7 +35,7 @@ use_envname <- function(
match_one <- length(envs) > 0
match_exact <- length(envs[envs == envname]) > 0

if(!is.null(main_library) && !match_exact) {
if (!is.null(main_library) && !match_exact) {
lib_info <- python_library_info(main_library, fail = FALSE, verbose = FALSE)
latest_ver <- lib_info$version
install_recent <- compareVersion(latest_ver, version) == 1
Expand All @@ -48,7 +46,7 @@ use_envname <- function(
msg_default <- paste0(
"{.header You do not have a Python environment that matches your",
" {.emph {con_label}} cluster}"
)
)

msg_1 <- NULL
msg_2 <- NULL
Expand All @@ -71,7 +69,7 @@ use_envname <- function(
# to choose the most recent environment
if (match_one && !match_exact && match_first) {
ret <- set_names(envs[1], "first")
if(install_recent) {
if (install_recent) {
msg_1 <- msg_default
msg_no <- glue(" - Will use alternate environment ({ret})")
} else {
Expand Down Expand Up @@ -103,14 +101,14 @@ use_envname <- function(
paste0("Yes", msg_yes),
paste0("No", msg_no),
"Cancel"
))
))
if (choice == 1) {
ret <- set_names(envname, "prompt")
exec(
.fn = glue("install_{backend}"),
version = version,
as_job = FALSE
)
)
}
if (choice == 2) {
ret <- set_names(ret, "prompt")
Expand Down
4 changes: 1 addition & 3 deletions R/sparklyr-sdf.R
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
sdf_schema.tbl_pyspark <- function(
x,
expand_nested_cols = NULL,
expand_struct_cols = NULL
) {

expand_struct_cols = NULL) {
check_arg_supported(expand_nested_cols)
check_arg_supported(expand_struct_cols)

Expand Down
Loading

0 comments on commit 6691373

Please sign in to comment.