Skip to content

Commit

Permalink
JDBC: timezone fix (#515)
Browse files Browse the repository at this point in the history
* JDBC: timezone fix

* JDBC: timezone fix

* JDBC: timezone fix
  • Loading branch information
dariakharlan authored Jan 17, 2025
1 parent 2dec80a commit 7bcc798
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 19 deletions.
14 changes: 8 additions & 6 deletions agent/src/agent/pipeline/config/jython_scripts/jdbc.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
sdc.importUnlock()


pipeline_timezone = pytz.timezone(sdc.userParams['TIMEZONE'])

def get_interval():
return int(sdc.userParams['INTERVAL_IN_SECONDS'])

Expand All @@ -24,8 +26,8 @@ def get_interval_missing_data():


def get_now():
if sdc.userParams['WATERMARK_IN_LOCAL_TIMEZONE'] == 'True':
now = int(time.mktime(datetime.now(pytz.timezone(sdc.userParams['TIMEZONE'])).timetuple()))
if sdc.userParams['TIMEZONE'] != 'UTC':
now = int(time.mktime(datetime.now(pipeline_timezone).timetuple()))
else:
now = int(time.time())
return now
Expand All @@ -36,7 +38,7 @@ def get_now_with_delay():


def to_timestamp(date):
epoch = datetime(1970, 1, 1)
epoch = datetime(1970, 1, 1).replace(tzinfo=pytz.UTC)
return int((date - epoch).total_seconds())


Expand Down Expand Up @@ -88,9 +90,9 @@ def main():
if sdc.lastOffsets.containsKey(entityName):
offset = int(float(sdc.lastOffsets.get(entityName)))
elif sdc.userParams['INITIAL_OFFSET']:
offset = to_timestamp(datetime.strptime(sdc.userParams['INITIAL_OFFSET'], '%d/%m/%Y %H:%M'))
offset = int(float(sdc.userParams['INITIAL_OFFSET']))
else:
offset = to_timestamp(datetime.utcnow().replace(second=0, microsecond=0) - interval)
offset = get_now() - get_interval()

sdc.log.info('OFFSET: ' + str(offset))

Expand Down Expand Up @@ -120,4 +122,4 @@ def main():
cur_batch.process(entityName, str(offset))


main()
main()
2 changes: 1 addition & 1 deletion agent/src/agent/pipeline/config/stages/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def get_config(self) -> dict:
pass

def get_initial_timestamp(self) -> datetime:
midnight = datetime.now(pytz.timezone('UTC')).replace(hour=0, minute=0, second=0, microsecond=0)
midnight = datetime.now(pytz.timezone(self.pipeline.timezone)).replace(hour=0, minute=0, second=0, microsecond=0)
return midnight - timedelta(days=int(self.pipeline.days_to_backfill))


Expand Down
12 changes: 1 addition & 11 deletions agent/src/agent/pipeline/config/stages/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,15 @@ class WatermarkDestination(Stage):
def get_config(self) -> dict:
body = """{
"schemaId": "${SCHEMA_ID}",
"watermark": WATERMARK_EXPRESSION
"watermark": ${record:value('/watermark')}
}"""
watermark_expression = "${record:value('/watermark')}"
if self.pipeline.watermark_in_local_timezone:
watermark_expression = '${' + self._convert_watermark_to_timezone() + '}'

body = body.replace('WATERMARK_EXPRESSION', watermark_expression)

return {
self.pipeline.destination.CONFIG_ENABLE_REQUEST_LOGGING: self.pipeline.watermark_logs_enabled,
'conf.requestBody': body,
**self.pipeline.destination.config,
}

def _convert_watermark_to_timezone(self):
timezone = pytz.timezone(self.pipeline.timezone)
offset = timezone.utcoffset(datetime.utcnow()).total_seconds()
return f'record:value("/watermark") - ({int(offset)})'


class EventsDestination(Stage):
def get_config(self) -> dict:
Expand Down
2 changes: 1 addition & 1 deletion agent/src/agent/pipeline/config/stages/jdbc.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def _get_script_params(self) -> list[dict]:
return [
{
'key': 'INITIAL_OFFSET',
'value': self.get_initial_timestamp().strftime('%d/%m/%Y %H:%M'),
'value': str(self.get_initial_timestamp().timestamp()),
},
{
'key': 'INTERVAL_IN_SECONDS',
Expand Down
1 change: 1 addition & 0 deletions docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ services:

mysql:
image: mysql:8.0.21
platform: linux/amd64
container_name: agent-mysql
command: --default-authentication-plugin=mysql_native_password
environment:
Expand Down

0 comments on commit 7bcc798

Please sign in to comment.