Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink not working with column with datatype 'timestamp' and python producer #15

Open
saveriogzz opened this issue Jul 20, 2021 · 1 comment

Comments

@saveriogzz
Copy link

Hello,

I'd like an Astra sink to consume some data. The sink should pull data to an Astra table with some columns having type timestamp but this is not working.

CREATE TABLE streaming_test.test_streaming_timestamp (
    sensor text,
    added timeuuid,
    temperature int,
    timestamp timestamp,
    PRIMARY KEY (sensor, added)
)

My sink's mapping is
sensor=value.sensor, added=now(), temperature=value.temperature, timestamp=value.timestamp

My test python producer script is

import pulsar
import uuid
from pulsar.schema import *
from datetime import datetime

class Example(Record):
    sensor = String()
    temperature = Integer()
    timestamp = String()

service_url = 'pulsar+ssl://my.cloud.somewhere.my.streaming.example:port'

# Use default CA certs for your environment
# Debian/Ubuntu:
trust_certs='/etc/ssl/certs/ca-certificates.crt'

token='myToken'
client = pulsar.Client(service_url,
                        authentication=pulsar.AuthenticationToken(token),
                        tls_trust_certs_file_path=trust_certs)


producer = client.create_producer('persistent://my-tenant/my-namespame/my-topic', schema=JsonSchema(Example))

now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

producer.send(Example(sensor=str(uuid.uuid4()), temperature=20, timestamp=now))

client.close()

Referring to the pulsar python client (definition.py), I cannot see any ways to specify Timestamp() in the schema. That is the reason why I am specifying timestamp as a String(), but still does not work. What am I missing?

Thanks a lot

@STomashevych
Copy link

Timestamp is stored as Integer in Cassandra, so converting it to int like this timestamp = int(time.time())*1000 works for me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants