Skip to content

Commit

Permalink
fix scrape paid along with other fixes for new manager
Browse files Browse the repository at this point in the history
  • Loading branch information
datawhores committed Aug 24, 2024
1 parent c8b6505 commit 040922a
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 78 deletions.
6 changes: 5 additions & 1 deletion ofscraper/actions/actions/download/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,17 @@ async def downloader(ele=None, posts=None, media=None, **kwargs):


@run_async
async def download_process(userdata, medialist, posts=None):
async def download_process(userdata=None, medialist=None, posts=None):
username = userdata["username"] if isinstance(userdata, dict) else userdata.name
model_id = userdata["id"] if isinstance(userdata, dict) else userdata.id
data, values = await download_picker(username, model_id, medialist, posts)
post_user_script(userdata, medialist, posts=None)
return data, values

@run_async
async def download_model_deleted_process(username,model_id, medialist=None, posts=None):
data, values = await download_picker(username, model_id, medialist, posts)
return data, values

async def download_picker(username, model_id, medialist, posts):
if (
Expand Down
8 changes: 4 additions & 4 deletions ofscraper/classes/placeholder.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ async def init(self):
return self

def add_price_variables(self, username):
modelObj = manager.Manager.model_manager.get_model_fromParsed(username)
modelObj = manager.Manager.model_manager.get_model(username)
self._variables.update(
{
"current_price": (
Expand Down Expand Up @@ -258,7 +258,7 @@ async def add_main_variables(self, ele, username, model_id):
self._variables.update({"response_type": ele.modified_responsetype})
self._variables.update({"label": ele.label_string})
self._variables.update({"download_type": ele.downloadtype})
self._variables.update({"modelObj": manager.Manager.model_manager.get_model_fromParsed(username)})
self._variables.update({"modelObj": manager.Manager.model_manager.get_model(username)})
self._variables.update({"quality": await ele.selected_quality_placeholder})
self._variables.update({"file_name": await ele.final_filename})
self._variables.update({"original_filename": ele.filename})
Expand Down Expand Up @@ -432,7 +432,7 @@ async def init(self):
return self

def add_price_variables(self, username):
modelObj = manager.Manager.model_manager.get_model_fromParsed(username)
modelObj = manager.Manager.model_manager.get_model(username)
self._variables.update(
{
"current_price": (
Expand Down Expand Up @@ -488,7 +488,7 @@ async def add_main_variables(self, ele, username, model_id):
self._variables.update({"media_type": "Text"})
self._variables.update({"response_type": ele.modified_responsetype})
self._variables.update({"label": ele.label_string})
self._variables.update({"modelObj": manager.Manager.model_manager.get_model_fromParsed(username)})
self._variables.update({"modelObj": manager.Manager.model_manager.get_model(username)})
self._variables.update({"text": ele.text_trunicate(ele.file_sanitized_text)})
self._variables.update({"config": config_file.open_config()})
self._variables.update({"args": read_args.retriveArgs()})
Expand Down
2 changes: 1 addition & 1 deletion ofscraper/classes/sessionmanager/sessionmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ async def requests_async(
log.debug(f"[bold]status: [bold] {r.status}")
log.debug(f"[bold]response text [/bold]: {await r.text_()}")
log.debug(f"response headers {dict(r.headers)}")
log.debug(f"requests headers {dict(r.request.headers)}")
log.debug(f"requests headers mode{dict(r.request.headers)}")
r.raise_for_status()
except Exception as E:
# only call from sync req like "me"
Expand Down
2 changes: 1 addition & 1 deletion ofscraper/commands/runners/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def update_globals(model_id, username, post, media, values):
"media": [],
"username": username,
"model_id": model_id,
"userdata": manager.Manager.model_manager.get_model_fromParsed(username),
"userdata": manager.Manager.model_manager.get_model(username),
"results": values,
},
)
Expand Down
109 changes: 56 additions & 53 deletions ofscraper/data/api/paid.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async def process_tasks(tasks):
log.traceback_(E)
log.traceback_(traceback.format_exc())
continue
tasks = new_tasks
tasks = new_tasks_batch

progress_utils.remove_api_task(page_task)
log.debug(
Expand Down Expand Up @@ -137,70 +137,72 @@ async def scrape_paid(c, username, offset=0):

@run
async def get_all_paid_posts():
data = await create_tasks_scrape_paid()
return create_all_paid_dict(data)
async with manager2.Manager.aget_ofsession(
sem_count=constants.getattr("SCRAPE_PAID_SEMS"),
) as c:
tasks = await create_tasks_scrape_paid(c)
data= await process_tasks_all_paid(tasks)
return create_all_paid_dict(data)


async def create_tasks_scrape_paid():
output = []
async def create_tasks_scrape_paid(c):
min_posts = 80
tasks = []
page_count = 0
async with manager2.Manager.aget_ofsession(
sem_count=constants.getattr("SCRAPE_PAID_SEMS"),
) as c:
allpaid = cache.get("purchased_all", default=[])
log.debug(f"[bold]All Paid Cache[/bold] {len(allpaid)} found")
allpaid = cache.get("purchased_all", default=[])
log.debug(f"[bold]All Paid Cache[/bold] {len(allpaid)} found")

if len(allpaid) > min_posts:
splitArrays = [i for i in range(0, len(allpaid), min_posts)]
[
tasks.append(
asyncio.create_task(
scrape_all_paid(
c,
required=min_posts,
offset=splitArrays[i],
)
)
)
for i in range(0, len(splitArrays) - 1)
]
if len(allpaid) > min_posts:
splitArrays = [i for i in range(0, len(allpaid), min_posts)]
[
tasks.append(
asyncio.create_task(
scrape_all_paid(c, offset=splitArrays[-1], required=None)
scrape_all_paid(
c,
required=min_posts,
offset=splitArrays[i],
)
)
)
else:
tasks.append(asyncio.create_task(scrape_all_paid(c)))

page_task = progress_utils.add_api_task(
f"[Scrape Paid] Pages Progress: {page_count}", visible=True
for i in range(0, len(splitArrays) - 1)
]
tasks.append(
asyncio.create_task(
scrape_all_paid(c, offset=splitArrays[-1], required=None)
)
)
while tasks:
new_tasks = []
for task in asyncio.as_completed(tasks):
try:
result, new_tasks_batch = await task
page_count = page_count + 1
progress_utils.update_api_task(
page_task,
description=f"[Scrape Paid] Pages Progress: {page_count}",
)
output.extend(result)
log.debug(
f"{common_logs.PROGRESS_IDS.format('ALL Paid')} {list(map(lambda x:x['id'],result))}"
)
trace_progress_log(f"{API} all users tasks", result)

tasks.extend(new_tasks_batch)
else:
tasks.append(asyncio.create_task(scrape_all_paid(c)))
return tasks

except Exception as E:
async def process_tasks_all_paid(tasks):
output = []
page_count = 0
allpaid = cache.get("purchased_all", default=[])
log.debug(f"[bold]All Paid Cache[/bold] {len(allpaid)} found")
page_task = progress_utils.add_api_task(
f"[Scrape Paid] Pages Progress: {page_count}", visible=True
)
while tasks:
new_tasks_batch = []
for task in asyncio.as_completed(tasks):
try:
result, new_tasks_batch = await task
page_count = page_count + 1
progress_utils.update_api_task(
page_task,
description=f"[Scrape Paid] Pages Progress: {page_count}",
)
output.extend(result)
log.debug(
f"{common_logs.PROGRESS_IDS.format('ALL Paid')} {list(map(lambda x:x['id'],result))}"
)
trace_progress_log(f"{API} all users tasks", result)
except Exception as E:

log.traceback_(E)
log.traceback_(traceback.format_exc())
tasks = new_tasks_batch
progress_utils.remove_api_task(page_task)
log.traceback_(E)
log.traceback_(traceback.format_exc())
tasks = new_tasks_batch
progress_utils.remove_api_task(page_task)

log.debug(f"[bold]Paid Post count with Dupes[/bold] {len(output)} found")
trace_log_raw(f"{API} all users final", output, final_count=True)
Expand All @@ -214,6 +216,7 @@ async def create_tasks_scrape_paid():
return output



def create_all_paid_dict(paid_content):
user_dict = {}
for ele in paid_content:
Expand Down
20 changes: 10 additions & 10 deletions ofscraper/data/models/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def get_num_selected(self):
return len(self._parsed_subs)


def get_model_fromParsed(self,name):
def get_model(self,name):
return self._all_subs_dict.get(name)


Expand Down Expand Up @@ -61,23 +61,23 @@ def all_subs_dict(self, value):
def getselected_usernames(self,rescan=False, reset=False):
# username list will be retrived every time resFet==True
if reset is True and rescan is True:
self.all_subs_helper()
self.all_subs_retriver()
self.parsed_subscriptions_helper(reset=True)
elif reset is True and self._parsed_subs:
prompt = prompts.reset_username_prompt()
if prompt == "Selection":
self.all_subs_helper()
self.all_subs_retriver()
self.parsed_subscriptions_helper(reset=True)
elif prompt == "Data":
self.all_subs_helper()
self.all_subs_retriver()
self.parsed_subscriptions_helper()
elif prompt == "Selection_Strict":
self.parsed_subscriptions_helper(reset=True)
elif rescan is True:
self.all_subs_helper()
self.all_subs_retriver()
self.parsed_subscriptions_helper()
else:
self.all_subs_helper(refetch=False)
self.all_subs_retriver(refetch=False)
self.parsed_subscriptions_helper()
return self._parsed_subs

Expand All @@ -101,13 +101,13 @@ async def set_data_all_subs_dict(self,username):

args.usernames = new_names
write_args.setArgs(args)
await self.all_subs_helper() if len(new_names) > 0 else None
await self.all_subs_retriver() if len(new_names) > 0 else None
args.usernames = set(all_usernames)
write_args.setArgs(args)


@run
async def all_subs_helper(self,refetch=True):
async def all_subs_retriver(self,refetch=True):
if bool(self.all_subs_dict) and not refetch:
return
while True:
Expand Down Expand Up @@ -172,7 +172,7 @@ def setfilter(self):
sorted(args.black_list)
) or not list(sorted(old_list)) == list(sorted(args.user_list)):
print("Updating Models")
self.all_subs_helper(rescan=True)
self.all_subs_retriver(rescan=True)
elif choice == "list":
old_args = read_args.retriveArgs()
old_blacklist = old_args.black_list
Expand All @@ -182,7 +182,7 @@ def setfilter(self):
sorted(args.black_list)
) or not list(sorted(old_list)) == list(sorted(args.user_list)):
print("Updating Models")
self.all_subs_helper(rescan=True)
self.all_subs_retriver(rescan=True)
elif choice == "select":
old_args = read_args.retriveArgs()
args = prompts.modify_list_prompt(old_args)
Expand Down
18 changes: 10 additions & 8 deletions ofscraper/data/posts/scrape_paid.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import logging

import ofscraper.data.api.profile as profile
import ofscraper.classes.models as models
import ofscraper.data.posts.post as OF
import ofscraper.actions.actions.download.download as download
import ofscraper.data.models.manager as userselector
import ofscraper.utils.live.screens as progress_utils
import ofscraper.utils.live.updater as progress_updater
from ofscraper.commands.utils.strings import (
Expand All @@ -16,6 +14,10 @@
)
from ofscraper.utils.context.run_async import run
from ofscraper.runner.close.final.final_user import post_user_script
import ofscraper.utils.constants as constants
import ofscraper.runner.manager as manager




log = logging.getLogger("shared")
Expand All @@ -24,7 +26,7 @@
@run
async def scrape_paid_all():
out = ["[bold yellow]Scrape Paid Results[/bold yellow]"]

await manager.Manager.model_manager.all_subs_retriver()
async for count, value, length in process_scrape_paid():
process_user_info_printer(
value,
Expand Down Expand Up @@ -93,11 +95,11 @@ async def process_user(value, length):
username = value["username"]
posts = value["posts"]
medias = value["medias"]

userselector.set_ALL_SUBS_DICTVManger(
{username: models.Model(profile.scrape_profile(model_id))}
)
if username ==constants.getattr("DELETED_MODEL_PLACEHOLDER"):
data, _ =await download.download_model_deleted_process(username,model_id,medias)
else:
userdata=manager.Manager.model_manager.get_model(username) or profile.scrape_profile(username)
data, _ =await download.download_process(userdata, medias, posts=posts)
progress_updater.increment_activity_count(total=length)
data, _ = await download.download_process(username, model_id, medias, posts=posts)
post_user_script(value, medias, posts)
return data

0 comments on commit 040922a

Please sign in to comment.