Skip to content

Commit

Permalink
final changes to sql dict rathern then tuple for less bugs
Browse files Browse the repository at this point in the history
max use of async in sql query
improve sql query with multiple possiblities i.e add_columns and certain transition queries
fix bugs related to fixes
  • Loading branch information
datawhores committed Apr 1, 2024
1 parent d0fd42c commit 0232817
Show file tree
Hide file tree
Showing 14 changed files with 349 additions and 316 deletions.
Empty file added cd
Empty file.
13 changes: 5 additions & 8 deletions ofscraper/actions/scraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,7 @@ async def process_paid_post(model_id, username, c):
paid_content,
model_id=model_id,
username=username,
)
operations.update_posts_table_helper(model_id=model_id,
username=username)

)
output = []
[output.extend(post.media) for post in paid_content]
log.debug(f"[bold]Paid media count without locked[/bold] {len(output)}")
Expand Down Expand Up @@ -138,7 +135,7 @@ async def process_stories(model_id, username, c):
stories,
)
)
operations.make_stories_tables_changes(
await operations.make_stories_tables_changes(
stories,
model_id=model_id,
username=username,
Expand Down Expand Up @@ -178,7 +175,7 @@ async def process_highlights(model_id, username, c):
highlights_,
)
)
operations.make_stories_tables_changes(
await operations.make_stories_tables_changes(
highlights_,
model_id=model_id,
username=username,
Expand Down Expand Up @@ -405,11 +402,11 @@ async def process_all_paid():
output = []
for model_id, value in user_dict.items():
username = profile.scrape_profile(model_id).get("username")
if username == "modeldeleted" and operations.check_profile_table_exists(
if username == "modeldeleted" and await operations.check_profile_table_exists(
model_id=model_id, username=username
):
username = (
operations.get_profile_info(model_id=model_id, username=username)
await operations.get_profile_info(model_id=model_id, username=username)
or username
)
log.info(f"Processing {username}_{model_id}")
Expand Down
8 changes: 4 additions & 4 deletions ofscraper/api/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ async def get_archived_media(model_id, username, forced_after=None, c=None):
overall_progress = progress_utils.overall_progress

oldarchived = (
operations.get_archived_postinfo(model_id=model_id, username=username)
await operations.get_archived_postinfo(model_id=model_id, username=username)
if not read_args.retriveArgs().no_cache
else []
)
Expand All @@ -62,7 +62,7 @@ async def get_archived_media(model_id, username, forced_after=None, c=None):
log.debug(f"[bold]Archived Cache[/bold] {len(oldarchived)} found")
oldarchived = list(filter(lambda x: x != None, oldarchived))

after = get_after(model_id, username, forced_after)
after = await get_after(model_id, username, forced_after)
splitArrays = get_split_array(oldarchived, username, after)
tasks=get_tasks(splitArrays, c, model_id, job_progress, after)

Expand Down Expand Up @@ -230,7 +230,7 @@ def set_check(unduped, model_id, after):
cache.close()


def get_after(model_id, username, forced_after=None):
async def get_after(model_id, username, forced_after=None):
if forced_after != None:
return forced_after
elif read_args.retriveArgs().after == 0:
Expand All @@ -256,7 +256,7 @@ def get_after(model_id, username, forced_after=None):
missing_items = list(sorted(missing_items, key=lambda x: x.get('created_at')))
if len(missing_items) == 0:
log.debug("Using last db date because,all downloads in db marked as downloaded")
return operations.get_last_archived_date(model_id=model_id, username=username)
return await operations.get_last_archived_date(model_id=model_id, username=username)
else:
log.debug(
f"Setting date slightly before earliest missing item\nbecause {len(missing_items)} posts in db are marked as undownloaded"
Expand Down
33 changes: 16 additions & 17 deletions ofscraper/api/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ async def get_messages_progress(model_id, username, forced_after=None, c=None):
overall_progress = progress_utils.overall_progress

before = (read_args.retriveArgs().before or arrow.now()).float_timestamp
after = get_after(model_id, username, forced_after)
after = await get_after(model_id, username, forced_after)
oldmessages = (
operations.get_messages_post_info(model_id=model_id, username=username)
await operations.get_messages_post_info(model_id=model_id, username=username)
if not read_args.retriveArgs().no_cache
else []
)
Expand Down Expand Up @@ -128,7 +128,7 @@ async def get_messages(model_id, username, forced_after=None, c=None):
responseArray = []

oldmessages = (
operations.get_messages_post_info(model_id=model_id, username=username)
await operations.get_messages_post_info(model_id=model_id, username=username)
if not read_args.retriveArgs().no_cache
else []
)
Expand Down Expand Up @@ -205,11 +205,11 @@ async def get_messages(model_id, username, forced_after=None, c=None):


def get_filterArray(after, before, oldmessages):
oldmessages = list(filter(lambda x: (x.get("date")) != None, oldmessages))
oldmessages = list(filter(lambda x: (x.get("created_at")) != None, oldmessages))
log.debug(f"[bold]Messages Cache[/bold] {len(oldmessages)} found")
oldmessages = sorted(
oldmessages,
key=lambda x: arrow.get(x.get("date")).float_timestamp,
key=lambda x: x.get("created_at"),
reverse=True,
)
if after > before:
Expand All @@ -225,16 +225,16 @@ def get_i(oldmessages, before):
iterate through posts until a date less then or equal
to before , set index to -1 this point
"""
if before >= oldmessages[1].get("date"):
if before >= oldmessages[1].get("created_at"):
return 0
if before <= oldmessages[-1].get("date"):
if before <= oldmessages[-1].get("created_at"):
return len(oldmessages) - 2
# Use a generator expression for efficiency
return max(
next(
index - 1
for index, message in enumerate(oldmessages)
if message.get("date") <= before
if message.get("created_at") <= before
),
0,
)
Expand All @@ -245,15 +245,15 @@ def get_j(oldmessages, after):
iterate through posts until a date less then or equal
to after , set index to +1 this point
"""
if after >= oldmessages[0].get("date"):
if after >= oldmessages[0].get("created_at"):
return 0
if after < oldmessages[-1].get("date"):
if after < oldmessages[-1].get("created_at"):
return len(oldmessages) - 1
return min(
next(
index + 1
for index, message in enumerate(oldmessages)
if message.get("date") <= after
if message.get("created_at") <= after
),
len(oldmessages) - 1,
)
Expand All @@ -280,8 +280,7 @@ def get_tasks(splitArrays, filteredArray, oldmessages, model_id, job_progress, c
message_id=splitArrays[0][0].get("id")
if len(filteredArray) == len(oldmessages)
else None,
required_ids=set([ele.get("date") for ele in splitArrays[0]]),
offset=True,
required_ids=set([ele.get("created_at") for ele in splitArrays[0]]),
)
)
)
Expand All @@ -293,7 +292,7 @@ def get_tasks(splitArrays, filteredArray, oldmessages, model_id, job_progress, c
model_id,
job_progress=job_progress,
message_id=splitArrays[i - 1][-1].get("id"),
required_ids=set([ele.get("date") for ele in splitArrays[i]]),
required_ids=set([ele.get("created_at") for ele in splitArrays[i]]),
)
)
)
Expand All @@ -307,7 +306,7 @@ def get_tasks(splitArrays, filteredArray, oldmessages, model_id, job_progress, c
model_id,
job_progress=job_progress,
message_id=splitArrays[-2][-1].get("id"),
required_ids=set([ele.get("date") for ele in splitArrays[-1]]),
required_ids=set([ele.get("created_at") for ele in splitArrays[-1]]),
)
)
)
Expand Down Expand Up @@ -510,7 +509,7 @@ def get_individual_post(model_id, postid):
log.debug(f"[bold]Individual message headers:[/bold] {r.headers}")


def get_after(model_id, username, forced_after=None):
async def get_after(model_id, username, forced_after=None):
if forced_after != None:
return forced_after
elif read_args.retriveArgs().after == 0:
Expand All @@ -537,7 +536,7 @@ def get_after(model_id, username, forced_after=None):
"Using last db date because,all downloads in db are marked as downloaded"
)
return arrow.get(
operations.get_last_message_date(model_id=model_id, username=username)
await operations.get_last_message_date(model_id=model_id, username=username)
).float_timestamp
else:
log.debug(
Expand Down
16 changes: 8 additions & 8 deletions ofscraper/api/timeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ async def get_timeline_media_progress(model_id, username, forced_after=None, c=N

job_progress = progress_utils.timeline_progress
overall_progress = progress_utils.overall_progress
after = get_after(model_id, username, forced_after)
after = await get_after(model_id, username, forced_after)

splitArrays = get_split_array(model_id, username, after)
splitArrays = await get_split_array(model_id, username, after)
tasks = get_tasks(splitArrays, c, model_id, job_progress, after)
page_task = overall_progress.add_task(
f" Timeline Content Pages Progress: {page_count}", visible=True
Expand Down Expand Up @@ -113,7 +113,7 @@ async def get_timeline_media(model_id, username, forced_after=None, c=None):
responseArray = []
page_count = 0
if not read_args.retriveArgs().no_cache:
oldtimeline = operations.get_timeline_postinfo(
oldtimeline = await operations.get_timeline_postinfo(
model_id=model_id, username=username
)
else:
Expand All @@ -127,7 +127,7 @@ async def get_timeline_media(model_id, username, forced_after=None, c=None):
)
log.debug(f"[bold]Timeline Cache[/bold] {len(oldtimeline)} found")
oldtimeline = list(filter(lambda x: x != None, oldtimeline))
after = get_after(model_id, username, forced_after)
after = await get_after(model_id, username, forced_after)

splitArrays = get_split_array(model_id, username, after)
tasks = get_tasks(splitArrays, c, model_id, job_progress, after)
Expand Down Expand Up @@ -175,11 +175,11 @@ async def get_timeline_media(model_id, username, forced_after=None, c=None):
return list(unduped.values())


def get_split_array(model_id, username, after):
async def get_split_array(model_id, username, after):
min_posts = 50

if not read_args.retriveArgs().no_cache:
oldtimeline = operations.get_timeline_postinfo(
oldtimeline = await operations.get_timeline_postinfo(
model_id=model_id, username=username
)
else:
Expand Down Expand Up @@ -307,7 +307,7 @@ def get_individual_post(id):
log.debug(f"[bold]individual post headers:[/bold] {r.headers}")


def get_after(model_id, username, forced_after=None):
async def get_after(model_id, username, forced_after=None):
if forced_after != None:
return forced_after
elif read_args.retriveArgs().after == 0:
Expand All @@ -331,7 +331,7 @@ def get_after(model_id, username, forced_after=None):
missing_items = list(sorted(missing_items, key=lambda x: arrow.get(x[12])))
if len(missing_items) == 0:
log.debug("Using last db date because,all downloads in db marked as downloaded")
return operations.get_last_timeline_date(model_id=model_id, username=username)
return await operations.get_last_timeline_date(model_id=model_id, username=username)
else:
log.debug(
f"Setting date slightly before earliest missing item\nbecause {len(missing_items)} posts in db are marked as undownloaded"
Expand Down
Loading

0 comments on commit 0232817

Please sign in to comment.