Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add features and feature_view #528

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,48 @@
"shopify_customer_id",
]
ID_GRAPH_MODEL_SUFFIX = "id_graph"
PRE_DEFINED_FEATURES = [
{
"name": "number_of_devices_purchased",
"select": "count(distinct device_id)",
"from": "inputs/rsTracks_pb_tutorial",
"description": "total number of devices bought by each customer",
},
{
"name": "last_order_date",
"select": "max(timestamp)",
"from": "inputs/rsTracks_pb_tutorial",
"description": "timestamp of most recent order per customer",
},
{
"name": "days_since_last_order",
"select": "datediff(day, {{user.last_order_date}}, current_date)",
"description": "timestamp of most recent order per customer",
},
]
USER_DEFINED_FEATURES = [
{
"name": "account_creation_date",
"select": "min(TIMESTAMP)",
"from": "inputs/rsIdentifies_pb_tutorial",
"description": "account creation date for each customer",
"user_prompt": "From our source data, we know that the account creation is coming from inputs/rsIdentifies_pb_tutorial. We want to select the first timestamp per user from this source table. We will use a min() function in order to do this. ",
},
{
"name": "last_seen_timestamp",
"select": "last_value(timestamp)",
"from": "inputs/rsPages_pb_tutorial",
"window": {
"order_by": ["timestamp asc"],
},
"user_prompt": "We will now create a feature that uses a window function to show you the structure. Let's now create the last_seen_date.",
"order_by_prompt": "Given that profiles will automatically partion the user by the user_main_id, let's now order this partition in the correct order so that we can ensure that the timestamp we are selecting within this partition is indeed the last event record with the last timestamp, per user.",
},
{
"name": "total_revenue",
"select": "sum(INVOICE_COST)",
"from": "inputs/rsTracks_pb_tutorial",
"description": "total revenue per user",
"user_prompt": "Now, we want to build a feature that outputs the total revenue for each user. Here, we will use a sum() function",
},
]
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,78 @@
"""
)

CONCLUSION_MESSAGE: Final[
THIRD_RUN_CONCLUSION_MESSAGE: Final[
str
] = """
Great! We can see that even the user_main_ids with the highest count are within the threshold that we would expect.
Now that Secure Solutions, LLC has a solid id graph, they can unlock future major value add projects for the business!! Like building an accurate 360 degree view of their customers, predicting accurate churn scores, model marketing attribution, etc.
Let's move on to the final section to build a handful of useful features for our user entity.
"""


FEATURE_CREATION_INTRO: Callable[[str], str] = (
lambda entity: f"""In this final section, we will walk through feature creation:
Now that we have a solid id graph, we can now build accurate features(or attributes) on your users. Generally, when building features for users you are either aggregating data or performing window functions across your tables. In either case, you would use an identifier to either `group by` or `partition by` the user.
Given that you may have many different data tables across many different sources, each one having multiple different identifiers for a single instance of a customer, a single customer identifier column that you would use to group or partition by will not suffice. That is why we build the ID Graph first. Building an ID Graph was really a means to an end. The end being to generate a unifying key that connects all identifiers for each customer instance across all sources. Now that we have generated a key (`{entity}_main_id`) that unifies the users across disparate sources, we will now use that user_main_id to `group by` or `partition by`.

Within profiles, each customer feature has it's own definition called an entity_var. These definitions are structured similar to select SQL queries so you have a lot of flexibility on the structure and the output. You can derive simple values like timestamps, integers, strings, as well as more complex features using array, json , or variant SQL functions. The exact SQL syntax will differ depending on what warehouse platform you are using.

Within the profiles project configuration, each entity_var definition will have the following structure:

1. NAME: This will be the column alias in the output c360 feature view
2. SELECT: This is a select statement that tells profiles what column or combination of columns to select from an input source along with what aggregate or window function to use.
3. FROM: The input source for profiles to query in order to derive the values

Note: It is implicit that you will be aggregating or partitioning data from your sources, so you have to either use an aggregation or window function within the select of an entity var definition. When profiles runs the sql query on the source table, it will automatically group by or partition by the {entity}_main_id. You will be able to observe the sql generated from your configuration within our outputs directory after we build the entity vars and perform a pb run.

entity_var definitions can be really flexible and there are many optional keys to add in order to fine tune the sql query and derive the desired output for your customers. You can visit our docs in order to learn more about how to create features. (ref: https://www.rudderstack.com/docs/profiles/core-concepts/feature-development/#define-features)"""
)


FEATURE_DETAILS: Final[
str
] = """
To start, Secure Solutions, LLC wants to build 6 features for their customers in order to power personalized content across their email and ad marketing efforts.

The 6 features they want to build are:

1. Account Creation Date: We want to know the date customers created their account
2. Last Seen Date: We want to know the timestamp of the last sight visit for each customer
3. Total Revenue: We want to know the total dollars spent per customer, if any
4. Number of Devices Purchased: We want to know how many devices each customer has purchased and enrolled in the security subscription app
5. Last Order Date: The last date each customer placed an order
6. Days Since Last Order: The number of days since a customer has placed an order.

Let's build the first few features from the list above and then we will auto fill the last 3 for reference.
"""

FEATURES_ADDED: Final[
str
] = """
Great!! You have now build 3 features. For the sake of time, we auto filled the final 3 entity_vars within the profiles.yaml file. You can check this file out now to see the yaml structure of the ones you created as well as the last 3 we auto filled.

Take note of the entity_vars we auto filled to learn more about the structure of entity vars and how you can perform different sql functions.

One entity_var definition we want to point out is the days_since_last_order var. Notice how that entity_var does not have a FROM key within the definition. That is because the data source where we are performing the calculation is not in any of the input sources. Rather, we are performing a date calculation on the underlying c360 table which is generated during each run. You can also see that we are referencing another previously defined entity_var within the select statement in order to perform our date calculation. This is an important concept to know and familiarize your self with because it creates more flexibility on what features you can build for your entities.
"""


DEFINE_FEATURE_VIEW: Final[
str
] = """
This is the final step!

Now that we have defined 6 entity_vars, we want to now create a view in the warehouse that will output these features for each user. Profiles has a concept known as a feature_view and it is defined on the entity level within the pb_project.yaml. A feature view is meant to output one record per user. And each column will be a feature on that user. The features being the entity_vars we defined in the previous step.

We will create two feature views in this section. One is known as the default feature view which will be created automatically. This default feature view will then serve as a base view that you can then generate custom feature views from.

The default feature view will have the generated user_main_id as the primary key. Because this key is strictly internal to profiles, meaning you will not use it for downstream systems, you can also create a custom feature view using any of the id types from your entity id type list as the primary key.
"""

CONCLUSSION: Final[
str
] = """
Great!
Secure Solutions, LLC now has a solid and accurate ID Graph modeled around their user entity. We then built a feature view on top of the id graph that has accurate and helpful traits for our users that we can then user to power our personalization efforts.
We hope that you can take this knowledge and apply it with your own customer data!!
"""
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
SAMPLE_DATA_DIR,
TABLE_SUFFIX,
ID_GRAPH_MODEL_SUFFIX,
PRE_DEFINED_FEATURES,
USER_DEFINED_FEATURES,
)


Expand Down Expand Up @@ -47,9 +49,11 @@ def run(self, material: WhtMaterial):
self.map_tables_to_id_types(relevant_tables, id_types, entity_name)
# profiles.yaml
self.build_id_stitcher_model(relevant_tables, entity_name, id_graph_model)
self.pb_runs(entity_name, id_graph_model, target)
self.pb_runs(entity_name, id_types, id_graph_model, target)

def pb_runs(self, entity_name, id_graph_model: str, target: str):
def pb_runs(
self, entity_name, id_types: list[str], id_graph_model: str, target: str
):
self.io.display_multiline_message(messages.ABOUT_PB_COMPILE)
self.io.get_user_input("Enter `pb compile`", options=["pb compile"])
os.chdir("profiles")
Expand Down Expand Up @@ -80,11 +84,13 @@ def pb_runs(self, entity_name, id_graph_model: str, target: str):
seq_no,
target,
)
# Add features and create feature views
self.fourth_run(entity_name, id_types, target)

def explain_pb_compile_results(
self, target: str, pb_compile_output: str, id_graph_model: str
):
seq_no, _ = self.parse_pb_output_text(pb_compile_output, id_graph_model)
seq_no, _ = self.parse_material_name(pb_compile_output, id_graph_model)
self.io.display_multiline_message(
messages.EXPLAIN_PB_COMPILE_RESULTS(target, seq_no)
)
Expand Down Expand Up @@ -225,22 +231,22 @@ def _subprocess_run(self, args):
), f"Command {args} failed with error: {response.stderr}"
return response.stdout

def parse_pb_output_text(self, pb_output_text: str, id_graph_name: str):
def parse_material_name(self, pb_output_text: str, model_name: str):
seq_no_pattern = r"--seq_no (\d+)"
match = re.search(seq_no_pattern, pb_output_text)
assert match, f"Failed to find seq_no in the pb output {pb_output_text}"
seq_no = match.group(1)
pattern = rf"(Material_{id_graph_name}_[a-f0-9]+_\d+)"
pattern = rf"(Material_{model_name}_[a-f0-9]+_\d+)"
match = re.search(pattern, pb_output_text)
assert (
match
), f"Failed to find id_graph_table_name in the pb output {pb_output_text}"
id_graph_table_name = match.group(1)
return int(seq_no), id_graph_table_name
material_name = match.group(1)
return int(seq_no), material_name

def prompt_to_do_pb_run(
self,
id_graph_name: str,
model_name: str,
target: str,
command: str = "pb run",
):
Expand All @@ -253,11 +259,9 @@ def prompt_to_do_pb_run(
[*command.split(), "--target", target, "--migrate_on_load"]
)
os.chdir("..")
seq_no, id_graph_table_name = self.parse_pb_output_text(
pb_run_output, id_graph_name
)
seq_no, material_name = self.parse_material_name(pb_run_output, model_name)
self.io.display_message("Done")
return seq_no, id_graph_table_name
return seq_no, material_name

def explain_pb_run_results(self, entity_name: str, id_stitcher_table_name: str):
self.io.display_multiline_message(
Expand Down Expand Up @@ -441,18 +445,19 @@ def second_run(
SUM(user_id) AS user_id_count,
SUM(shopify_customer_id) AS shopify_customer_id_count,
count(*) as total_count,
{{% if warehouse.DatabaseType() == 'snowflake' %}}
ARRAY_SORT(ARRAY_AGG(OBJECT_CONSTRUCT(other_id_type, id_other))) AS id_list
{{% elif warehouse.DatabaseType() == 'redhsift' %}}
JSON_PARSE('[' || LISTAGG('{{"' || other_id_type || '"' || ':' || id_other || '}}', ', ') WITHIN GROUP (ORDER BY other_id_type) || ']') as id_list
{{% elif warehouse.DatabaseType() == 'databricks' %}}
COLLECT_LIST(map(other_id_type, id_other)) as id_list
{{% elif warehouse.DatabaseType() == 'bigquery' %}}
ARRAY_AGG(STRUCT(other_id_type, id_other) ORDER BY other_id_type) as id_list
{{% endif %}}
FROM id_value_counts
group by 1,2
order by email_count desc"""
# Id Lists
# {{% if warehouse.DatabaseType() == 'snowflake' %}}
# ARRAY_SORT(ARRAY_AGG(OBJECT_CONSTRUCT(other_id_type, id_other))) AS id_list
# {{% elif warehouse.DatabaseType() == 'redhsift' %}}
# JSON_PARSE('[' || LISTAGG('{{"' || other_id_type || '"' || ':' || id_other || '}}', ', ') WITHIN GROUP (ORDER BY other_id_type) || ']') as id_list
# {{% elif warehouse.DatabaseType() == 'databricks' %}}
# COLLECT_LIST(map(other_id_type, id_other)) as id_list
# {{% elif warehouse.DatabaseType() == 'bigquery' %}}
# ARRAY_AGG(STRUCT(other_id_type, id_other) ORDER BY other_id_type) as id_list
# {{% endif %}}
query_investigate_bad_anons_sql = (
self.db_manager.material.execute_text_template(
query_investigate_bad_anons, skip_material_wrapper=True
Expand Down Expand Up @@ -508,4 +513,74 @@ def third_run(

result = self.cluster_size_analysis(entity_name, id_stitcher_table_name_3)
self.io.display_message(result.head(20).to_string())
self.io.display_multiline_message(messages.CONCLUSION_MESSAGE)
self.io.display_multiline_message(messages.THIRD_RUN_CONCLUSION_MESSAGE)

def fourth_run(
self,
entity_name: str,
id_types: list[str],
target: str,
):
self.io.display_multiline_message(messages.FEATURE_CREATION_INTRO(entity_name))

for feature in USER_DEFINED_FEATURES:
self.io.display_message(feature["user_prompt"])
self.io.get_user_input(
"Let's input the name of this feature for our c360 view:\n>",
options=[feature["name"]],
default=feature["name"],
)
self.io.get_user_input(
"Enter the aggregation function to use as well as the column to select from:\n>",
options=[feature["select"]],
default=feature["select"],
)
if "order_by_prompt" in feature:
self.io.get_user_input(
f"{feature['order_by_prompt']}\n>",
options=[feature["window"]["order_by"][0]],
default=feature["window"]["order_by"][0],
)

self.io.get_user_input(
"Now, let's enter the data source for this feature:\n>",
options=[feature["from"]],
default=feature["from"],
)

self.yaml_generator.add_features(
entity_name, [*USER_DEFINED_FEATURES, *PRE_DEFINED_FEATURES]
)
self.io.display_multiline_message(messages.FEATURES_ADDED)
self.io.display_multiline_message(messages.DEFINE_FEATURE_VIEW)

feature_view_name = self.io.get_user_input(
"Let's define this customer feature view with a name as well as the id_type we want to use as our primary key:\n>",
options=[f"customers_by_{id_type}" for id_type in id_types],
default=f"customers_by_email",
)
feature_view_using_id = self.io.get_user_input(
"Now, let's choose what id_type we want to use for the primary key:\n>",
options=id_types,
default="email",
)
self.yaml_generator.add_feature_views(
entity_name,
[{"id": feature_view_using_id, "name": feature_view_name}],
)
self.io.display_message(
"You can now look at your pb_project.yaml and see the defined feature view. Again, this will output 2 views. The default view created automatically, and the customer one you defined here. "
)

seq_no, feature_view_table_name = self.prompt_to_do_pb_run(
feature_view_name, target
)
self.io.display_message(
"Great job! You have now created two feature views. Let's query the custom feature view you created."
)

sql = f"select * from {self.db_manager.get_qualified_name(feature_view_table_name)}"
res = self.db_manager.client.query_sql_with_result(sql)
self.io.display_message(res.head(10).to_string())

self.io.display_multiline_message(messages.CONCLUSSION)
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,46 @@ def update_bad_anons_filter(self):
with open(CONFIG_FILE_PATH, "w") as file:
yaml.dump(pb_project, file)

def guide_id_type_input(self, entity_name):
def add_features(self, entity_name: str, features: list[dict]):
vars = []
for feature in features:
entity_var = {
"name": feature["name"],
"select": feature["select"],
}
if "from" in feature:
entity_var["from"] = feature["from"]
if "description" in feature:
entity_var["description"] = feature["description"]
if "window" in feature:
entity_var["window"] = feature["window"]
if "where" in feature:
entity_var["where"] = feature["where"]

vars.append({"entity_var": entity_var})

with open(PROFILES_FILE_PATH, "r") as file:
profiles = yaml.safe_load(file)

profiles["var_groups"] = [
{"name": f"{entity_name}_features", "entity_key": entity_name, "vars": vars}
]
with open(PROFILES_FILE_PATH, "w") as file:
yaml.dump(profiles, file)

def add_feature_views(self, entity_name: str, using_ids: list[dict]):
with open(CONFIG_FILE_PATH, "r") as file:
pb_project = yaml.safe_load(file)

entities = pb_project["entities"]
for entity in entities:
if entity["name"] == entity_name:
entity["feature_views"] = {"using_ids": using_ids}

with open(CONFIG_FILE_PATH, "w") as file:
yaml.dump(pb_project, file)

def guide_id_type_input(self, entity_name) -> list[str]:
about_id_types = {
"anon_id": """
RudderStack creates a cookie for each user when they visit your site.
Expand Down Expand Up @@ -146,6 +185,8 @@ def guide_id_type_input(self, entity_name):

user_input = self.io.get_user_input(
f"\nLet's add '{expected_id_type}' as an id type for {entity_name}: ",
default=expected_id_type,
options=[expected_id_type],
)
selected_id_types.append(user_input)

Expand Down
Loading