Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(select source sync time): Select source time of day anchor #29284

Open
wants to merge 28 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
fc53d68
Sync time model and UI updates.
phixMe Feb 26, 2025
71882da
Adding migration
phixMe Feb 26, 2025
6d64cb1
Adding calendar syncs.
phixMe Feb 27, 2025
9b51918
Update UI snapshots for `chromium` (3)
github-actions[bot] Feb 27, 2025
3aa5f4d
Update UI snapshots for `chromium` (2)
github-actions[bot] Feb 27, 2025
ed7e91a
Reverting back source change
phixMe Feb 27, 2025
7dc30e9
Add sync time of day update functionality for external data sources b…
phixMe Feb 27, 2025
7b8a7ed
Add local time toggle for data warehouse source sync time
phixMe Feb 27, 2025
78a5645
Adding local time to initial form.
phixMe Feb 27, 2025
28c4c50
Removing old UI element.
phixMe Feb 27, 2025
baa0bc4
Fixing the schedule, no jitter on non-midnight.
phixMe Feb 27, 2025
45b0011
Add support for sub-hour sync frequencies in Temporal scheduling
phixMe Feb 27, 2025
387701b
Update "Sync Time of Day" label to "First Sync Time" in Data Warehous…
phixMe Feb 28, 2025
3e25855
Merge branch 'master' into feature(select-source-sync-time)
phixMe Feb 28, 2025
09455fa
Moving around migations.
phixMe Feb 28, 2025
868d25e
Update UI snapshots for `chromium` (2)
github-actions[bot] Feb 28, 2025
132522a
Self code review updates
phixMe Feb 28, 2025
cb7061c
Update UI snapshots for `chromium` (2)
github-actions[bot] Feb 28, 2025
5e75a8c
Merge remote-tracking branch 'origin/feature(select-source-sync-time)…
phixMe Feb 28, 2025
5717cd5
Fixing formatting
phixMe Feb 28, 2025
e7dab9e
Fixing lint errors
phixMe Feb 28, 2025
81815d6
Updating test
phixMe Feb 28, 2025
2241c4f
Update UI snapshots for `chromium` (2)
github-actions[bot] Feb 28, 2025
d03483d
Merge branch 'master' into feature(select-source-sync-time)
phixMe Feb 28, 2025
c32cc37
Update UI snapshots for `chromium` (2)
github-actions[bot] Feb 28, 2025
d88ffd9
Update UI snapshots for `chromium` (2)
github-actions[bot] Feb 28, 2025
2532234
Fix test
phixMe Feb 28, 2025
4ff580d
Merge remote-tracking branch 'origin/feature(select-source-sync-time)…
phixMe Feb 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
46 changes: 43 additions & 3 deletions frontend/src/scenes/data-warehouse/external/forms/SchemaForm.tsx
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import { LemonButton, LemonCheckbox, LemonModal, LemonTable } from '@posthog/lemon-ui'
import { LemonButton, LemonCheckbox, LemonInput, LemonModal, LemonSwitch, LemonTable } from '@posthog/lemon-ui'
import { useActions, useValues } from 'kea'
import { dayjs } from 'lib/dayjs'

import { sourceWizardLogic } from '../../new/sourceWizardLogic'
import { SyncMethodForm } from './SyncMethodForm'

export default function SchemaForm(): JSX.Element {
const { toggleSchemaShouldSync, openSyncMethodModal } = useActions(sourceWizardLogic)
const { databaseSchema } = useValues(sourceWizardLogic)
const { toggleSchemaShouldSync, openSyncMethodModal, updateSyncTimeOfDay, setIsLocalTime } =
useActions(sourceWizardLogic)
const { databaseSchema, isLocalTime } = useValues(sourceWizardLogic)

return (
<>
Expand Down Expand Up @@ -50,7 +52,45 @@ export default function SchemaForm(): JSX.Element {
return schema.rows != null ? schema.rows : 'Unknown'
},
},
{
title: (
<div className="flex items-center gap-2">
<span>First Sync Time</span>
<div className="flex items-center gap-1">
<span>UTC</span>
<LemonSwitch checked={isLocalTime} onChange={setIsLocalTime} />
<span>{dayjs().format('z')}</span>
</div>
</div>
),
key: 'sync_time_of_day_local',
render: function RenderSyncTimeOfDayLocal(_, schema) {
const utcTime = schema.sync_time_of_day || '00:00:00'
const localTime = isLocalTime
? dayjs
.utc(`${dayjs().format('YYYY-MM-DD')}T${utcTime}`)
.local()
.format('HH:mm:00')
: utcTime

return (
<LemonInput
type="time"
disabled={!schema.should_sync}
value={localTime.substring(0, 5)}
onChange={(value) => {
const newValue = `${value}:00`
const utcValue = isLocalTime
? dayjs(`${dayjs().format('YYYY-MM-DD')}T${newValue}`)
.utc()
.format('HH:mm:00')
: newValue
updateSyncTimeOfDay(schema, utcValue)
}}
/>
)
},
},
{
key: 'sync_type',
title: 'Sync method',
Expand Down
18 changes: 18 additions & 0 deletions frontend/src/scenes/data-warehouse/new/sourceWizardLogic.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,11 @@ export const sourceWizardLogic = kea<sourceWizardLogicType>([
setManualLinkingProvider: (provider: ManualLinkSourceType) => ({ provider }),
openSyncMethodModal: (schema: ExternalDataSourceSyncSchema) => ({ schema }),
cancelSyncMethodModal: true,
updateSyncTimeOfDay: (schema: ExternalDataSourceSyncSchema, syncTimeOfDay: string) => ({
schema,
syncTimeOfDay,
}),
setIsLocalTime: (isLocalTime: boolean) => ({ isLocalTime }),
}),
connect({
values: [
Expand Down Expand Up @@ -878,6 +883,12 @@ export const sourceWizardLogic = kea<sourceWizardLogicType>([
should_sync: s.table === schema.table ? shouldSync : s.should_sync,
}))
},
updateSyncTimeOfDay: (state, { schema, syncTimeOfDay }) => {
return state.map((s) => ({
...s,
sync_time_of_day: s.table === schema.table ? syncTimeOfDay : s.sync_time_of_day,
}))
},
updateSchemaSyncType: (state, { schema, syncType, incrementalField, incrementalFieldType }) => {
return state.map((s) => ({
...s,
Expand Down Expand Up @@ -940,6 +951,12 @@ export const sourceWizardLogic = kea<sourceWizardLogicType>([
}),
},
],
isLocalTime: [
false as boolean,
{
setIsLocalTime: (_, { isLocalTime }) => isLocalTime,
},
],
}),
selectors({
isManualLinkingSelected: [(s) => [s.selectedConnector], (selectedConnector): boolean => !selectedConnector],
Expand Down Expand Up @@ -1132,6 +1149,7 @@ export const sourceWizardLogic = kea<sourceWizardLogicType>([
sync_type: schema.sync_type,
incremental_field: schema.incremental_field,
incremental_field_type: schema.incremental_field_type,
sync_time_of_day: schema.sync_time_of_day,
})),
},
})
Expand Down
47 changes: 46 additions & 1 deletion frontend/src/scenes/data-warehouse/settings/source/Schemas.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {
LemonButton,
LemonInput,
LemonModal,
LemonSelect,
LemonSkeleton,
Expand All @@ -13,6 +14,7 @@ import {
} from '@posthog/lemon-ui'
import { BindLogic, useActions, useValues } from 'kea'
import { TZLabel } from 'lib/components/TZLabel'
import { dayjs } from 'lib/dayjs'
import { More } from 'lib/lemon-ui/LemonButton/More'
import { useEffect } from 'react'
import { defaultQuery } from 'scenes/data-warehouse/utils'
Expand Down Expand Up @@ -53,7 +55,8 @@ const StatusTagSetting: Record<string, LemonTagType> = {
}

export const SchemaTable = ({ schemas, isLoading }: SchemaTableProps): JSX.Element => {
const { updateSchema, reloadSchema, resyncSchema } = useActions(dataWarehouseSourceSettingsLogic)
const { updateSchema, reloadSchema, resyncSchema, setIsLocalTime } = useActions(dataWarehouseSourceSettingsLogic)
const { isLocalTime } = useValues(dataWarehouseSourceSettingsLogic)
const { schemaReloadingById } = useValues(dataWarehouseSettingsLogic)

return (
Expand Down Expand Up @@ -96,6 +99,45 @@ export const SchemaTable = ({ schemas, isLoading }: SchemaTableProps): JSX.Eleme
)
},
},
{
title: (
<div className="flex items-center gap-2">
<span>First Sync Time</span>
<div className="flex items-center gap-1">
<span>UTC</span>
<LemonSwitch checked={isLocalTime} onChange={setIsLocalTime} />
<span>{dayjs().format('z')}</span>
</div>
</div>
),
key: 'sync_time_of_day_local',
render: function RenderSyncTimeOfDayLocal(_, schema) {
const utcTime = schema.sync_time_of_day || '00:00:00'
const localTime = isLocalTime
? dayjs
.utc(`${dayjs().format('YYYY-MM-DD')}T${utcTime}`)
.local()
.format('HH:mm:00')
: utcTime

return (
<LemonInput
type="time"
disabled={!schema.should_sync}
value={localTime.substring(0, 5)}
onChange={(value) => {
const newValue = `${value}:00`
const utcValue = isLocalTime
? dayjs(`${dayjs().format('YYYY-MM-DD')}T${newValue}`)
.utc()
.format('HH:mm:00')
: newValue
updateSchema({ ...schema, sync_time_of_day: utcValue })
}}
/>
)
},
},
{
title: 'Sync method',
key: 'incremental',
Expand Down Expand Up @@ -333,6 +375,7 @@ const SyncMethodModal = ({ schema }: { schema: ExternalDataSourceSchema }): JSX.
table: currentSyncMethodModalSchema.name,
should_sync: currentSyncMethodModalSchema.should_sync,
sync_type: currentSyncMethodModalSchema.sync_type,
sync_time_of_day: currentSyncMethodModalSchema.sync_time_of_day ?? '00:00:00',
incremental_field: currentSyncMethodModalSchema.incremental_field ?? null,
incremental_field_type: currentSyncMethodModalSchema.incremental_field_type ?? null,
incremental_available: !!schemaIncrementalFields.length,
Expand All @@ -350,6 +393,7 @@ const SyncMethodModal = ({ schema }: { schema: ExternalDataSourceSchema }): JSX.
sync_type: syncType,
incremental_field: null,
incremental_field_type: null,
sync_time_of_day: currentSyncMethodModalSchema.sync_time_of_day ?? '00:00:00',
})
} else {
updateSchema({
Expand All @@ -358,6 +402,7 @@ const SyncMethodModal = ({ schema }: { schema: ExternalDataSourceSchema }): JSX.
sync_type: syncType,
incremental_field: incrementalField,
incremental_field_type: incrementalFieldType,
sync_time_of_day: currentSyncMethodModalSchema.sync_time_of_day ?? '00:00:00',
})
}
}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export const dataWarehouseSourceSettingsLogic = kea<dataWarehouseSourceSettingsL
reloadSchema: (schema: ExternalDataSourceSchema) => ({ schema }),
resyncSchema: (schema: ExternalDataSourceSchema) => ({ schema }),
setCanLoadMoreJobs: (canLoadMoreJobs: boolean) => ({ canLoadMoreJobs }),
setIsLocalTime: (isLocalTime: boolean) => ({ isLocalTime }),
}),
loaders(({ actions, values }) => ({
source: [
Expand All @@ -40,7 +41,9 @@ export const dataWarehouseSourceSettingsLogic = kea<dataWarehouseSourceSettingsL
clonedSource.schemas[schemaIndex] = schema
actions.loadSourceSuccess(clonedSource)

const updatedSchema = await api.externalDataSchemas.update(schema.id, schema)
const updatedSchema = await api.externalDataSchemas.update(schema.id, {
...schema,
})

const source = values.source
if (schemaIndex !== undefined) {
Expand Down Expand Up @@ -100,6 +103,12 @@ export const dataWarehouseSourceSettingsLogic = kea<dataWarehouseSourceSettingsL
setSourceId: () => true,
},
],
isLocalTime: [
false as boolean,
{
setIsLocalTime: (_, { isLocalTime }) => isLocalTime,
},
],
})),
selectors({
sourceFieldConfig: [
Expand Down
2 changes: 2 additions & 0 deletions frontend/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4240,6 +4240,7 @@ export interface ExternalDataSourceSyncSchema {
table: string
rows?: number | null
should_sync: boolean
sync_time_of_day: string | null
incremental_field: string | null
incremental_field_type: string | null
sync_type: 'full_refresh' | 'incremental' | null
Expand All @@ -4251,6 +4252,7 @@ export interface ExternalDataSourceSchema extends SimpleExternalDataSourceSchema
table?: SimpleDataWarehouseTable
incremental: boolean
sync_type: 'incremental' | 'full_refresh' | null
sync_time_of_day: string | null
status?: string
latest_error: string | null
incremental_field: string | null
Expand Down
17 changes: 17 additions & 0 deletions posthog/migrations/0680_externaldataschema_sync_time_of_day.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Generated by Django 4.2.18 on 2025-02-26 18:47

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("posthog", "0679_feature_flag_concurrency"),
]

operations = [
migrations.AddField(
model_name="externaldataschema",
name="sync_time_of_day",
field=models.TimeField(blank=True, help_text="Time of day to run the sync (UTC)", null=True),
),
]
2 changes: 1 addition & 1 deletion posthog/migrations/max_migration.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0679_feature_flag_concurrency
0680_externaldataschema_sync_time_of_day
15 changes: 14 additions & 1 deletion posthog/warehouse/api/external_data_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class ExternalDataSchemaSerializer(serializers.ModelSerializer):
incremental_field_type = serializers.SerializerMethodField(read_only=True)
sync_frequency = serializers.SerializerMethodField(read_only=True)
status = serializers.SerializerMethodField(read_only=True)
sync_time_of_day = serializers.SerializerMethodField(read_only=True)

class Meta:
model = ExternalDataSchema
Expand All @@ -68,6 +69,7 @@ class Meta:
"incremental_field",
"incremental_field_type",
"sync_frequency",
"sync_time_of_day",
]

read_only_fields = [
Expand Down Expand Up @@ -109,6 +111,9 @@ def get_table(self, schema: ExternalDataSchema) -> Optional[dict]:
def get_sync_frequency(self, schema: ExternalDataSchema):
return sync_frequency_interval_to_sync_frequency(schema.sync_frequency_interval)

def get_sync_time_of_day(self, schema: ExternalDataSchema):
return schema.sync_time_of_day

def update(self, instance: ExternalDataSchema, validated_data: dict[str, Any]) -> ExternalDataSchema:
data = self.context["request"].data

Expand Down Expand Up @@ -154,7 +159,9 @@ def update(self, instance: ExternalDataSchema, validated_data: dict[str, Any]) -

should_sync = validated_data.get("should_sync", None)
sync_frequency = data.get("sync_frequency", None)
sync_time_of_day = data.get("sync_time_of_day", None)
was_sync_frequency_updated = False
was_sync_time_of_day_updated = False

if sync_frequency:
sync_frequency_interval = sync_frequency_to_sync_frequency_interval(sync_frequency)
Expand All @@ -164,6 +171,12 @@ def update(self, instance: ExternalDataSchema, validated_data: dict[str, Any]) -
validated_data["sync_frequency_interval"] = sync_frequency_interval
instance.sync_frequency_interval = sync_frequency_interval

if sync_time_of_day is not None:
if sync_time_of_day != instance.sync_time_of_day:
was_sync_time_of_day_updated = True
validated_data["sync_time_of_day"] = sync_time_of_day
instance.sync_time_of_day = sync_time_of_day

if should_sync is True and sync_type is None and instance.sync_type is None:
raise ValidationError("Sync type must be set up first before enabling schema")

Expand All @@ -178,7 +191,7 @@ def update(self, instance: ExternalDataSchema, validated_data: dict[str, Any]) -
if should_sync is True:
sync_external_data_job_workflow(instance, create=True)

if was_sync_frequency_updated:
if was_sync_frequency_updated or was_sync_time_of_day_updated:
sync_external_data_job_workflow(instance, create=False)

if trigger_refresh:
Expand Down
22 changes: 12 additions & 10 deletions posthog/warehouse/api/external_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ def create(self, request: Request, *args: Any, **kwargs: Any) -> Response:
is_incremental = sync_type == "incremental"
incremental_field = schema.get("incremental_field")
incremental_field_type = schema.get("incremental_field_type")
sync_time_of_day = schema.get("sync_time_of_day")

if is_incremental and incremental_field is None:
new_source_model.delete()
Expand All @@ -463,6 +464,7 @@ def create(self, request: Request, *args: Any, **kwargs: Any) -> Response:
source=new_source_model,
should_sync=schema.get("should_sync"),
sync_type=sync_type,
sync_time_of_day=sync_time_of_day,
sync_type_config=(
{
"incremental_field": incremental_field,
Expand Down Expand Up @@ -1014,7 +1016,7 @@ def database_schema(self, request: Request, *arg: Any, **kwargs: Any):
ExternalDataSource.Type.MSSQL,
]:
# Importing pymssql requires mssql drivers to be installed locally - see posthog/warehouse/README.md
from pymssql import OperationalError as MSSQLOperationalError
# from pymssql import OperationalError as MSSQLOperationalError

host = request.data.get("host", None)
port = request.data.get("port", None)
Expand Down Expand Up @@ -1116,17 +1118,17 @@ def database_schema(self, request: Request, *arg: Any, **kwargs: Any):
status=status.HTTP_400_BAD_REQUEST,
data={"message": exposed_error or get_generic_sql_error(source_type)},
)
except MSSQLOperationalError as e:
error_msg = " ".join(str(n) for n in e.args)
exposed_error = self._expose_mssql_error(error_msg)
# except MSSQLOperationalError as e:
# error_msg = " ".join(str(n) for n in e.args)
# exposed_error = self._expose_mssql_error(error_msg)

if exposed_error is None:
capture_exception(e)
# if exposed_error is None:
# capture_exception(e)

return Response(
status=status.HTTP_400_BAD_REQUEST,
data={"message": exposed_error or get_generic_sql_error(source_type)},
)
# return Response(
# status=status.HTTP_400_BAD_REQUEST,
# data={"message": exposed_error or get_generic_sql_error(source_type)},
# )
except BaseSSHTunnelForwarderError as e:
return Response(
status=status.HTTP_400_BAD_REQUEST,
Expand Down
Loading
Loading