Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: wfau/gaiadmpsetup
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v0.1.1
Choose a base ref
...
head repository: wfau/gaiadmpsetup
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: main
Choose a head ref

Commits on Jun 29, 2022

  1. .DS_Store banished!

    NigelHambly committed Jun 29, 2022
    Copy the full SHA
    1289725 View commit details
  2. Copy the full SHA
    d6d1ed1 View commit details
  3. Fixed schema over-write bug; modified DR3 schema structures for consi…

    …stency and alignment with DR3 bulk release products
    NigelHambly committed Jun 29, 2022
    Copy the full SHA
    86868be View commit details

Commits on Jul 5, 2022

  1. Copy the full SHA
    5cd6f90 View commit details

Commits on Jul 7, 2022

  1. Copy the full SHA
    f22edb0 View commit details
  2. Bug fixes in setup; DR3 demo dataset finalised: commented out epoch_p…

    …hotometry for now as the boolean arrays were incorrectly parsed from csv
    NigelHambly committed Jul 7, 2022
    Copy the full SHA
    a57f03a View commit details
  3. Merge pull request #4 from NigelHambly/main

    DR3 demo data set finalised
    stvoutsin authored Jul 7, 2022
    Copy the full SHA
    8827426 View commit details
  4. Copy the full SHA
    f039709 View commit details
  5. Merge pull request #5 from NigelHambly/main

    Fix setup to switch DB context before checking table sets
    Zarquan authored Jul 7, 2022
    Copy the full SHA
    bf2f092 View commit details

Commits on Aug 3, 2022

  1. Copy the full SHA
    fd039c6 View commit details

Commits on Aug 9, 2022

  1. Merge pull request #7 from NigelHambly/main

    Fixed ALLWISE incorrect schema bug and included all XMs in DR3
    Zarquan authored Aug 9, 2022
    Copy the full SHA
    c6ec82a View commit details

Commits on Sep 2, 2022

  1. Copy the full SHA
    647908b View commit details

Commits on Sep 7, 2022

  1. Merge pull request #8 from NigelHambly/main

    Fixed bug for parsing double-quoted boolean labels in CSVs
    stvoutsin authored Sep 7, 2022
    Copy the full SHA
    feacc54 View commit details

Commits on Sep 21, 2022

  1. Copy the full SHA
    82aa06b View commit details
  2. Merge pull request #10 from stvoutsin/bugfix/imports

    Fix import of gaiaedr3_pyspark_schema_structures
    stvoutsin authored Sep 21, 2022
    Copy the full SHA
    ec0310d View commit details

Commits on Sep 28, 2022

  1. Copy the full SHA
    94ca8bd View commit details

Commits on Feb 7, 2023

  1. data store as env var

    akrause2014 committed Feb 7, 2023
    Copy the full SHA
    2eb86e7 View commit details

Commits on Feb 13, 2023

  1. Merge pull request #12 from NigelHambly/main

    Completed DR3 set up with all available tables
    Zarquan authored Feb 13, 2023
    Copy the full SHA
    e4ed86d View commit details

Commits on Mar 22, 2023

  1. Copy the full SHA
    42f9549 View commit details
  2. Copy the full SHA
    5181708 View commit details
  3. new version

    akrause2014 committed Mar 22, 2023
    Copy the full SHA
    c620c57 View commit details
  4. fixed schema parameter

    akrause2014 committed Mar 22, 2023
    Copy the full SHA
    103e2f2 View commit details
  5. add missing slash

    akrause2014 committed Mar 22, 2023
    Copy the full SHA
    10f50aa View commit details
  6. removed unused import

    akrause2014 committed Mar 22, 2023
    Copy the full SHA
    d054c1c View commit details

Commits on Mar 29, 2023

  1. Merge pull request #13 from akrause2014/main

    Configurable data location
    stvoutsin authored Mar 29, 2023
    Copy the full SHA
    0a42e65 View commit details
  2. Update version to 0.1.5

    stvoutsin authored Mar 29, 2023
    Copy the full SHA
    8e12864 View commit details
  3. Merge pull request #14 from stvoutsin/bugfix/update-version

    Update version to 0.1.5
    stvoutsin authored Mar 29, 2023
    Copy the full SHA
    0236a28 View commit details
Showing with 557 additions and 224 deletions.
  1. +1 −0 .gitignore
  2. +1 −0 gaiadmpconf/conf.py
  3. +63 −26 gaiadmpsetup/gaiadmpsetup.py
  4. +26 −14 gaiadmpsetup/gaiadmpstore.py
  5. +460 −178 gaiadmpsetup/gaiadr3_pyspark_schema_structures.py
  6. +4 −4 gaiadmpsetup/gaiaedr3_pyspark_schema_structures.py
  7. +2 −2 setup.py
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.DS_Store
1 change: 1 addition & 0 deletions gaiadmpconf/conf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
GAIA_DATA_LOCATION = 'file:///data/gaia/'
89 changes: 63 additions & 26 deletions gaiadmpsetup/gaiadmpsetup.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,29 @@
from pyspark.sql.types import *
from pyspark.sql.session import SparkSession
from pyspark.sql.utils import AnalysisException

from . import gaiaedr3_pyspark_schema_structures as edr3
from . import gaiadr3_pyspark_schema_structures as dr3
from .gaiadmpstore import *
from gaiadmpconf import conf

from urllib.parse import urlsplit, unquote_plus
from pathlib import Path

spark = SparkSession.builder.getOrCreate()

class Location(object):

def __init__(self, url):
(scheme, net_loc, path, _, _) = urlsplit(url)
path = Path(unquote_plus(path))
self.parts = (scheme, net_loc, path)

def __eq__(self, other):
return self.parts == other.parts

def __hash__(self):
return hash(self.parts)

spark = SparkSession.builder.getOrCreate()

@@ -12,43 +32,60 @@ class GaiaDMPSetup:
Prepare the PySpark env for GaiaDMP
"""

databases = {
'gaiaedr3': edr3,
'gaiadr3': dr3,
}

def __init__(self):
pass

@staticmethod
def setup():

def tablesExist():
actual_tables = [i.name for i in spark.catalog.listTables()]
expected_tables = edr3.table_dict.keys() # | dr3.table_dict.keys() TODO add in DR3 tables expected once loaded
check = all(item in actual_tables for item in expected_tables)
return check

if not tablesExist():
# database name to create
database = "gaiaedr3"
data_store = conf.GAIA_DATA_LOCATION

# create the database and switch the current SQL database context to it (from default)
spark.sql("create database " + database)
spark.sql("use " + database)
def tablesExist(expected_tables, database):

check = False

try:

spark.sql("use " + database)
actual_tables = [i.name for i in spark.catalog.listTables()]
check = all(item in actual_tables for item in expected_tables)

except AnalysisException: pass

return check

# create the tables against their corresponding file sets and schema
for table_key in edr3.table_dict.keys():
folder_path = edr3.table_dict[table_key][1]
schemas = edr3.table_dict[table_key][0]
reattachParquetFileResourceToSparkContext(table_key, data_store + folder_path, schemas)
def location_changed(expected_location, schema):
check = False
for table_key in schema.table_dict.keys():
location = spark.sql(f"desc formatted {table_key}").filter("col_name=='Location'").collect()[0].data_type
folder_path = schema.table_dict[table_key][1]
if Location(location) != Location(expected_location + folder_path):
check = True
# only compare the first table, assuming they are all the same
break
return check

# ... similarly for Gaia DR3
database = "gaiadr3"
spark.sql("create database " + database)
spark.sql("use " + database)
for database, schema in GaiaDMPSetup.databases.items():
if not tablesExist(schema.table_dict.keys(), database) or location_changed(data_store, schema):

# TODO create the tables against their corresponding file sets and schema
#for table_key in dr3.table_dict.keys():
# folder_path = dr3.table_dict[table_key][0]
# schema = dr3.table_dict[table_key][1]
# reattachParquetFileResourceToSparkContext(table_key, data_store + folder_path, *schema)
# create the database and switch the current SQL database context to it (from default)
spark.sql("create database if not exists " + database)
spark.sql("use " + database)

# create the tables against their corresponding file sets and schema
for table_key in schema.table_dict.keys():
folder_path = schema.table_dict[table_key][1]
schemas = schema.table_dict[table_key][0]
pk = schema.table_dict[table_key][2]
reattachParquetFileResourceToSparkContext(table_key, data_store + folder_path, schemas, cluster_key = pk, sort_key = pk)

# finally always leave the PySpark SQL context in the most recent Gaia DR3 database
spark.sql("use gaiadr3")

GaiaDMPSetup.setup()

40 changes: 26 additions & 14 deletions gaiadmpsetup/gaiadmpstore.py
Original file line number Diff line number Diff line change
@@ -8,9 +8,6 @@

spark = SparkSession.builder.getOrCreate()

# root data store path: TODO change this to the official one when established.
data_store = "file:////data/gaia/" # "file:////user/nch/PARQUET/REPARTITIONED/"

# default key by which to bucket and sort: Gaia catalogue source UID
default_key = "source_id"

@@ -43,7 +40,7 @@ def saveToBinnedParquet(df, outputParquetPath, name, mode = "error", buckets = N
.saveAsTable(name)

def reattachParquetFileResourceToSparkContext(table_name, file_path, schema_structures, cluster_key = default_key, sort_key = default_key, buckets = NUM_BUCKETS):
"""
'''
Creates a Spark (in-memory) meta-record for the table resource specified for querying
through the PySpark SQL API.
@@ -69,7 +66,7 @@ def reattachParquetFileResourceToSparkContext(table_name, file_path, schema_stru
Default is Gaia catalogue source UID (= source_id).
buckets : int (optional)
Number of buckets into which the data is organised.
"""
'''

# put in the columns and their data types ...
table_create_statement = "CREATE TABLE `" + table_name + "` ("
@@ -93,6 +90,8 @@ def reattachParquetFileResourceToSparkContext(table_name, file_path, schema_stru
# create the table resource
spark.sql(table_create_statement)

import copy

def create_interim_schema_for_csv(schema_structure):
'''
Takes a schema StructType() and substitutes all array types as a string in order
@@ -115,13 +114,14 @@ def create_interim_schema_for_csv(schema_structure):

# iterate over the schema, copying in everything and substituting strings for any arrays
for field in schema_structure:
if type(field.dataType) == ArrayType: field.dataType = StringType()
interim_structure.add(field)
interim_field = copy.deepcopy(field)
if type(interim_field.dataType) == ArrayType: interim_field.dataType = StringType()
interim_structure.add(interim_field)

return interim_structure

def cast_to_array(data_frame : DataFrame, column_name : str, data_type : DataType):
"""
'''
Casts the specified string column in the given data frame into an
array with the specified data type. Assumes the string column contains
comma-separated values in plain text delimited by braces (which are
@@ -143,13 +143,17 @@ def cast_to_array(data_frame : DataFrame, column_name : str, data_type : DataTyp
Returns:
--------
a new data frame containing the requested modification
"""
'''

# a temporary working column name for the array
temporary_column_name = column_name + '_array_data'

# reformat the string csv data as an array of the specified type
data_frame = data_frame.withColumn(temporary_column_name, f.split(f.col(column_name).substr(f.lit(2), f.length(f.col(column_name)) - 2), ',').cast(data_type))
if isinstance(data_type.elementType, BooleanType):
# ... need to allow for the double quoted boolean labels
data_frame = data_frame.withColumn(temporary_column_name, f.split(f.col(column_name).substr(f.lit(3), f.length(f.col(column_name)) - 3), '","').cast(data_type))
else:
data_frame = data_frame.withColumn(temporary_column_name, f.split(f.col(column_name).substr(f.lit(2), f.length(f.col(column_name)) - 2), ',').cast(data_type))

# drop the original string column to save space
data_frame = data_frame.drop(column_name)
@@ -161,7 +165,7 @@ def cast_to_array(data_frame : DataFrame, column_name : str, data_type : DataTyp


def reorder_columns(data_frame : DataFrame, data_structure : StructType):
"""
'''
Reorder the columns according to the Gaia archive public schema and so that
the parquet files can be re-attached against that standard schema.
@@ -171,7 +175,11 @@ def reorder_columns(data_frame : DataFrame, data_structure : StructType):
The PySpark data frame instance to be operated on
data_structure : StructType()
The PySpark data structure containing the required schema definition
"""
Returns:
--------
a new data frame with columns re-ordered according to that in the schema
'''

# use the schema to define the column order
ordered_columns = [field.name for field in data_structure]
@@ -181,7 +189,7 @@ def reorder_columns(data_frame : DataFrame, data_structure : StructType):


def cast_all_arrays(data_frame : DataFrame, data_structure : StructType):
"""
'''
Given an interim data frame read from csv and containing arrays in
plain text string representation, cycles over the schema transforming
all strings associated with arrays into the required primitive type.
@@ -192,7 +200,11 @@ def cast_all_arrays(data_frame : DataFrame, data_structure : StructType):
The PySpark data frame instance to be operated on
data_structure : StructType()
The PySpark data structure containing the required schema definition
"""
Returns:
--------
a new data frame with all arrays expressed as interim strings cast to their array structure type
'''

# cycle over the defined fields looking for arrays
for field in data_structure:
Loading