Skip to content

Commit

Permalink
Merge pull request #508 from slingdata-io/v1.4.3
Browse files Browse the repository at this point in the history
V1.4.3
  • Loading branch information
flarco authored Feb 14, 2025
2 parents 6c93883 + 6b2e77d commit 108c3ba
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 76 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ core/dbio/filesys/test/dataset1M.csv
core/dbio/filesys/test/dataset100k.csv
cmd/sling/tests/suite/
cmd/sling/tests/replications/r.test.yaml
cmd/sling/tests/pipelines/p.test.yaml

*..go
*__test.go
Expand Down
4 changes: 0 additions & 4 deletions cmd/sling/sling_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,10 +517,6 @@ func cliInit(done chan struct{}) int {
setSentry()
ok, err := g.CliProcess()

if time.Now().UnixMicro()%20 == 0 {
defer SlingMedia.PrintFollowUs()
}

if err != nil || env.TelMap["error"] != nil {
if err == nil && env.TelMap["error"] != nil {
err = g.Error(cast.ToString(env.TelMap["error"]))
Expand Down
20 changes: 0 additions & 20 deletions cmd/sling/sling_media.go

This file was deleted.

39 changes: 23 additions & 16 deletions core/dbio/database/database_sqlserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ func (conn *MsSQLServerConn) BcpImportFileParrallel(tableFName string, ds *iop.D
quoteRep := `$~q$~`
newlRep := `$~n$~`
carrRep := `$~r$~`
emptyRep := `$~e$~`
postUpdateCol := map[int]uint64{}

// transformation to correctly post process quotes, newlines, and delimiter afterwards
Expand All @@ -369,12 +370,12 @@ func (conn *MsSQLServerConn) BcpImportFileParrallel(tableFName string, ds *iop.D
nRow[i] = strings.ReplaceAll(
nRow[i].(string), "\n", newlRep,
)
// bcp treats empty space as null
if !ds.Sp.Config.EmptyAsNull && v == "" {
nRow[i] = emptyRep
}
if nRow[i].(string) != val.(string) {
if _, ok := postUpdateCol[i]; ok {
postUpdateCol[i]++
} else {
postUpdateCol[i] = 1
}
postUpdateCol[i]++
}
default:
_ = v
Expand Down Expand Up @@ -451,25 +452,31 @@ func (conn *MsSQLServerConn) BcpImportFileParrallel(tableFName string, ds *iop.D
"delimiter", ",",
)
replExpr2 := g.R(
`REPLACE({replExpr1}, '{quoteRep}', '{quote}')`,
"replExpr1", replExpr1,
`REPLACE({replExpr}, '{quoteRep}', '{quote}')`,
"replExpr", replExpr1,
"quoteRep", quoteRep,
"quote", `"`,
)
replExpr3 := g.R(
`REPLACE({replExpr2}, '{newlRep}', {newl})`,
"replExpr2", replExpr2,
"newlRep", carrRep,
"newl", `CHAR(13)`,
`REPLACE({replExpr}, '{placeholder}', {newVal})`,
"replExpr", replExpr2,
"placeholder", carrRep,
"newVal", `CHAR(13)`,
)
replExpr4 := g.R(
`REPLACE({replExpr2}, '{newlRep}', {newl})`,
"replExpr2", replExpr3,
"newlRep", newlRep,
"newl", `CHAR(10)`,
`REPLACE({replExpr}, '{placeholder}', {newVal})`,
"replExpr", replExpr3,
"placeholder", newlRep,
"newVal", `CHAR(10)`,
)
replExpr5 := g.R(
`REPLACE({replExpr}, '{placeholder}', {newVal})`,
"replExpr", replExpr4,
"placeholder", emptyRep,
"newVal", `''`,
)
setCols = append(
setCols, fmt.Sprintf(`%s = %s`, col.Name, replExpr4),
setCols, fmt.Sprintf(`%s = %s`, col.Name, replExpr5),
)
}

Expand Down
46 changes: 25 additions & 21 deletions core/dbio/filesys/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1263,14 +1263,13 @@ func WriteDataflowViaDuckDB(fs FileSysClient, df *iop.Dataflow, uri string) (bw
sc.FileMaxBytes = cast.ToInt64(val)
}

if cast.ToInt64(fs.GetProp("FILE_MAX_ROWS")) > 0 {
g.Warn("splitting files with `file_max_rows` is not supported with duckdb, using `file_max_bytes = 16000000`")
sc.FileMaxBytes = 16000000
if val := cast.ToInt64(fs.GetProp("FILE_MAX_ROWS")); val > 0 {
sc.FileMaxRows = val
}

// merge into single stream to push into duckdb
duckSc := duck.DefaultCsvConfig()
duckSc.FileMaxRows = 0 // if we want to manually split by rows, multiple duck.Exec
duckSc.FileMaxRows = sc.FileMaxRows
streamPartChn, err := duck.DataflowToHttpStream(df, duckSc)
if err != nil {
return bw, g.Error(err)
Expand All @@ -1281,45 +1280,43 @@ func WriteDataflowViaDuckDB(fs FileSysClient, df *iop.Dataflow, uri string) (bw
fileFormat = InferFileFormat(uri)
}

streamDone := false
for streamPart := range streamPartChn {
if streamDone {
return bw, g.Error("stream has finished")
}

copyOptions := iop.DuckDbCopyOptions{
Format: fileFormat,
Compression: sc.Compression,
FileSizeBytes: sc.FileMaxBytes,
}

// if * is specified, set default FileSizeBytes,
if strings.Contains(uri, "*") && copyOptions.FileSizeBytes == 0 {
if strings.Contains(uri, "*") && copyOptions.FileSizeBytes == 0 && duckSc.FileMaxRows == 0 {
copyOptions.FileSizeBytes = 50 * 1024 * 1024 // 50MB default file size
}

// generate sql for export
switch fs.FsType() {
case dbio.TypeFileLocal:
localPath := duckURI
// copy files bytes recursively to target
if strings.Contains(duckURI, "*") {
duckURI = GetDeepestParent(duckURI) // get target folder, since split by files
duckURI = strings.TrimRight(duckURI, "/")
if strings.Contains(localPath, "*") {
localPath = GetDeepestParent(localPath) // get target folder, since split by files
localPath = strings.TrimRight(localPath, "/")
}

// create the parent folder if needed
if fs.FsType() == dbio.TypeFileLocal {
parent := duckURI
if sc.FileMaxBytes == 0 {
parent = path.Dir(duckURI)
}
parent := path.Dir(localPath)
if err = os.MkdirAll(parent, 0755); err != nil {
err = g.Error(err, "Could not create output folder")
return bw, err
}
if duckSc.FileMaxRows > 0 {
copyOptions.FileSizeBytes = 0 // since we are splitting by rows already
os.MkdirAll(localPath, 0755) // make root dir first
localPath = g.F("%s/data_%03d.parquet", localPath, streamPart.Index+1)
}
}

sql, err := duck.GenerateCopyStatement(streamPart.FromExpr, duckURI, copyOptions)
sql, err := duck.GenerateCopyStatement(streamPart.FromExpr, localPath, copyOptions)
if err != nil {
err = g.Error(err, "Could not generate duckdb copy statement")
return bw, err
Expand All @@ -1340,7 +1337,15 @@ func WriteDataflowViaDuckDB(fs FileSysClient, df *iop.Dataflow, uri string) (bw
}
default:
// copy to temp file locally, then upload
localPath := env.CleanWindowsPath(path.Join(folder, "output"))
localRoot := env.CleanWindowsPath(path.Join(folder, "output")) // could be file or dir
localPath := localRoot

if duckSc.FileMaxRows > 0 {
copyOptions.FileSizeBytes = 0 // since we are splitting by rows already
os.MkdirAll(localRoot, 0755) // make root dir first
localPath = g.F("%s/data_%03d.parquet", localRoot, streamPart.Index+1)
}

sql, err := duck.GenerateCopyStatement(streamPart.FromExpr, localPath, copyOptions)
if err != nil {
err = g.Error(err, "Could not generate duckdb copy statement")
Expand All @@ -1358,15 +1363,14 @@ func WriteDataflowViaDuckDB(fs FileSysClient, df *iop.Dataflow, uri string) (bw
uri = GetDeepestParent(uri) // get target folder, since split by files
}

written, err := CopyFromLocalRecursive(fs, localPath, uri)
written, err := CopyFromLocalRecursive(fs, localRoot, uri)
if err != nil {
err = g.Error(err, "Could not write to file")
return bw, err
}
bw += written
}

streamDone = true // only 1 reader batch
}

return
Expand Down
20 changes: 20 additions & 0 deletions core/dbio/iop/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2189,6 +2189,16 @@ func (ds *Datastream) NewJsonReaderChnl(sc StreamConfig) (readerChn chan *io.Pip

rec := g.M()
for i, val := range row0 {
if sVal, ok := val.(string); ok && batch.Columns[i].Type.IsJSON() {
if looksLikeJson(sVal) {
var v any
if err := g.Unmarshal(sVal, &v); err == nil {
val = v
}
} else if sVal == "null" {
val = nil
}
}
rec[fields[i]] = val
}

Expand Down Expand Up @@ -2262,6 +2272,16 @@ func (ds *Datastream) NewJsonLinesReaderChnl(sc StreamConfig) (readerChn chan *i

rec := g.M()
for i, val := range row0 {
if sVal, ok := val.(string); ok && batch.Columns[i].Type.IsJSON() {
if looksLikeJson(sVal) {
var v any
if err := g.Unmarshal(sVal, &v); err == nil {
val = v
}
} else if sVal == "null" {
val = nil
}
}
rec[fields[i]] = val
}

Expand Down
32 changes: 20 additions & 12 deletions core/dbio/iop/stream_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,19 +714,27 @@ func (sp *StreamProcessor) CastVal(i int, val interface{}, col *Column) interfac
case col.Type.IsInteger():
iVal, err := cast.ToInt64E(val)
if err != nil {
fVal, err := sp.toFloat64E(val)
if err != nil || sp.ds == nil {
// is string
sp.ds.ChangeColumn(i, StringType)
cs.StringCnt++
cs.TotalCnt++
sVal = cast.ToString(val)
sp.rowChecksum[i] = uint64(len(sVal))
return sVal
// if value is boolean casted as int
switch val {
case "true", true:
iVal = 1
case "false", false:
iVal = 0
default:
fVal, err := sp.toFloat64E(val)
if err != nil || sp.ds == nil {
// is string
sp.ds.ChangeColumn(i, StringType)
cs.StringCnt++
cs.TotalCnt++
sVal = cast.ToString(val)
sp.rowChecksum[i] = uint64(len(sVal))
return sVal
}
// is decimal
sp.ds.ChangeColumn(i, DecimalType)
return fVal
}
// is decimal
sp.ds.ChangeColumn(i, DecimalType)
return fVal
}

if iVal > cs.Max {
Expand Down
4 changes: 2 additions & 2 deletions core/sling/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -966,9 +966,9 @@ func (cfg *Config) GetFormatMap() (m map[string]any, err error) {
}
}

if len(blankKeys) > 0 {
if len(blankKeys) > 0 && g.IsDebug() {
// return g.Error("blank values for: %s", strings.Join(blankKeys, ", "))
g.Warn("Could not successfully get format values. Blank values for: %s", strings.Join(blankKeys, ", "))
g.Trace("Could not successfully get format values. Blank values for: %s", strings.Join(blankKeys, ", "))
}

now := time.Now()
Expand Down
3 changes: 3 additions & 0 deletions core/sling/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ func LoadPipelineConfig(content string) (pipeline *Pipeline, err error) {
Env = lo.Ternary(Env == nil, map[string]any{}, Env)
content = g.Rm(content, Env)

// set env
pipeline.Env = Env

// parse again
m = g.M()
err = yaml.Unmarshal([]byte(content), &m)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/elastic/go-elasticsearch/v8 v8.17.0
github.com/fatih/color v1.17.0
github.com/flarco/bigquery v0.0.9
github.com/flarco/g v0.1.135
github.com/flarco/g v0.1.136
github.com/getsentry/sentry-go v0.27.0
github.com/go-sql-driver/mysql v1.8.1
github.com/gobwas/glob v0.2.3
Expand Down

0 comments on commit 108c3ba

Please sign in to comment.