-
Notifications
You must be signed in to change notification settings - Fork 0
/
LangFlow Component X API
116 lines (101 loc) · 4.55 KB
/
LangFlow Component X API
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
from langflow.custom import Component
from langflow.inputs import StrInput, FileInput, BoolInput, MessageInput
from langflow.template import Output
from langflow.schema import Data
from datetime import datetime, timedelta
from apscheduler.schedulers.background import BackgroundScheduler
import requests
# Initialize the scheduler globally
scheduler = BackgroundScheduler()
scheduler.start()
class SocialMediaAutomation(Component):
"""
A Langflow component for social media automation.
"""
display_name = "Social Media Automation"
description = "Automates social media tasks."
icon = "social_media"
inputs = [
StrInput(name="platform", display_name="Platform",
info="The social media platform to automate.",
value="twitter"), # Default value: twitter
StrInput(name="action", display_name="Action",
info="The action to perform.",
value="post", placeholder="e.g., post, like, follow"), # Default value and placeholder
MessageInput(name="message", display_name="Message",
info="The message to post."), # Accepts Message objects
MessageInput(name="message", display_name="Message",
info="The message to post."),
#FileInput(name="image", display_name="Image",
#info="The image to post."), # Use FileInput
BoolInput(name="schedule", display_name="Schedule",
info="Whether to schedule the post.",
value=False), # Default value: False
StrInput(
name="cron_schedule",
display_name="Schedule (Cron)",
info="Enter a cron expression for scheduling (e.g., '0 9 * * *' for 9:00 AM every day). Leave blank to post immediately.",
value="", # Default to empty string for immediate posting
),
]
outputs = [
Output(name="output", display_name="Output", description="The output of the automation.", type="Data", method="execute"),
]
def execute(self) -> Data:
"""
Executes the social media automation.
"""
from langflow.schema.message import Message
platform = self.platform
action = self.action
message_obj = self.message
#image = self.image_data
schedule = self.schedule
cron_schedule = self.cron_schedule
if isinstance(message_obj, Message):
message = message_obj.text
else:
message = str(message_obj)
if platform.lower() == "twitter" and action.lower() == "post":
import tweepy
# Authenticate to Twitter
client = tweepy.Client (
# Replace with your actual Twitter API credentials
consumer_key = "Key Here",
consumer_secret = "Key Here",
access_token = "Key Here",
access_token_secret = "Key Here"
)
def post_tweet():
# For image
#auth = tweepy.OAuth1UserHandler(
# consumer_key,
# consumer_secret,
# access_token,
# access_token_secret,
#)
# Create API object
#api = tweepy.API(auth, wait_on_rate_limit=True
#)
#if image:
# Upload image
#media = api.media_upload(filename="temp.jpg", file=image_data)
#client.create_tweet(text=message, media_ids=[media.media_id])
#else:
# Create a tweet with the text typed in the message object
client.create_tweet(text=message)
# Posting the tweet
if schedule and cron_schedule: # Schedule using cron if both are True
scheduler.add_job(post_tweet, 'cron', cron_expression=cron_schedule)
output = Data(value={"status": "Tweet scheduled using cron: {}".format(cron_schedule)})
else:
try:
post_tweet() # Post immediately
output = Data(value={"Tweet successfully posted:", message})
except Exception as e:
output = Data(value={"Error sending tweet", str(e)})
# TODO: Handle other platforms (e.g., Facebook, Instagram) and actions (like, follow, comment, upload image etc.)
# ... (Code to handle other platforms and actions)
else:
output = Data(value={"status": "Unsupported platform or action"})
return output