Skip to content

Commit

Permalink
get model_id for single model table during transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
datawhores committed Apr 2, 2024
1 parent d82a1b3 commit 31bab26
Show file tree
Hide file tree
Showing 46 changed files with 648 additions and 406 deletions.
22 changes: 8 additions & 14 deletions ofscraper/actions/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import ofscraper.utils.profiles.tools as profile_tools
from ofscraper.utils.context.run_async import run


log = logging.getLogger("shared")


Expand Down Expand Up @@ -118,26 +117,21 @@ def process_user_first_helper(ele):


def scrape_paid(user_dict=None):
user_dict =OF.process_all_paid()
user_dict = OF.process_all_paid()
oldUsers = selector.get_ALL_SUBS_DICT()
length=len(list(user_dict.keys()))
for count,value in enumerate(user_dict.values()):
length = len(list(user_dict.keys()))
for count, value in enumerate(user_dict.values()):
model_id = value["model_id"]
username = value["username"]
posts=value["posts"]
medias=value["medias"]
posts = value["posts"]
medias = value["medias"]
log.warning(
f"Download paid content for {model_id}_{username} number:{count+1}/{length} models "
)
f"Download paid content for {model_id}_{username} number:{count+1}/{length} models "
)
selector.set_ALL_SUBS_DICTVManger(
{username: models.Model(profile.scrape_profile(model_id))}
)
download.download_process(
username,
model_id,
medias,
posts=posts
)
download.download_process(username, model_id, medias, posts=posts)
# restore og users
selector.set_ALL_SUBS_DICT(oldUsers)

Expand Down
51 changes: 33 additions & 18 deletions ofscraper/actions/scraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import ofscraper.utils.system.system as system
from ofscraper.utils.context.run_async import run


log = logging.getLogger("shared")


Expand Down Expand Up @@ -97,7 +96,7 @@ async def process_paid_post(model_id, username, c):
paid_content,
model_id=model_id,
username=username,
)
)
output = []
[output.extend(post.media) for post in paid_content]
log.debug(f"[bold]Paid media count without locked[/bold] {len(output)}")
Expand Down Expand Up @@ -218,7 +217,9 @@ async def process_timeline_posts(model_id, username, c):
timeline_posts,
)
)
timeline_only_posts=list(filter(lambda x:x.regular_timeline,timeline_posts))
timeline_only_posts = list(
filter(lambda x: x.regular_timeline, timeline_posts)
)

await operations.make_post_table_changes(
timeline_only_posts,
Expand Down Expand Up @@ -259,7 +260,9 @@ async def process_timeline_posts(model_id, username, c):
async def process_archived_posts(model_id, username, c):
try:
with stdout.lowstdout():
archived_posts = await archive.get_archived_posts_progress(model_id, username, c=c)
archived_posts = await archive.get_archived_posts_progress(
model_id, username, c=c
)
archived_posts = list(
map(
lambda x: posts_.Post(x, model_id, username, "archived"),
Expand Down Expand Up @@ -309,9 +312,7 @@ async def process_pinned_posts(model_id, username, c):
with stdout.lowstdout():
pinned_posts = await pinned.get_pinned_posts_progress(model_id, c=c)
pinned_posts = list(
map(
lambda x: posts_.Post(x, model_id, username), pinned_posts
)
map(lambda x: posts_.Post(x, model_id, username), pinned_posts)
)
await operations.make_post_table_changes(
pinned_posts,
Expand Down Expand Up @@ -386,16 +387,23 @@ async def process_all_paid():
paid_content = await paid.get_all_paid_posts()
user_dict = {}
for ele in paid_content:
user_id = ele.get("fromUser", {}).get("id") or ele.get("author", {}).get("id")
user_dict.setdefault(user_id, []).append(ele)
user_id = ele.get("fromUser", {}).get("id") or ele.get("author", {}).get(
"id"
)
user_dict.setdefault(user_id, []).append(ele)
output = {}
for model_id, value in user_dict.items():
username = profile.scrape_profile(model_id).get("username")
if username == "modeldeleted" and await operations.check_profile_table_exists(
model_id=model_id, username=username
if (
username == "modeldeleted"
and await operations.check_profile_table_exists(
model_id=model_id, username=username
)
):
username = (
await operations.get_profile_info(model_id=model_id, username=username)
await operations.get_profile_info(
model_id=model_id, username=username
)
or username
)
log.info(f"Processing {username}_{model_id}")
Expand All @@ -408,10 +416,14 @@ async def process_all_paid():
)
)
seen = set()
new_posts = [post for post in all_posts if post.id not in seen and not seen.add(post.id)]
new_medias=[item for post in new_posts for item in post.media]
new_medias=filters.filterMedia(new_medias)
new_posts=filters.filterPost(new_posts)
new_posts = [
post
for post in all_posts
if post.id not in seen and not seen.add(post.id)
]
new_medias = [item for post in new_posts for item in post.media]
new_medias = filters.filterMedia(new_medias)
new_posts = filters.filterPost(new_posts)
await operations.make_post_table_changes(
new_posts,
model_id=model_id,
Expand All @@ -424,9 +436,12 @@ async def process_all_paid():
downloaded=False,
)

output[model_id]=dict(model_id=model_id,username=username,posts=new_posts,medias=new_medias)
output[model_id] = dict(
model_id=model_id, username=username, posts=new_posts, medias=new_medias
)
log.debug(
f"[bold]Paid media count {username}_{model_id}[/bold] {len(new_medias)}")
f"[bold]Paid media count {username}_{model_id}[/bold] {len(new_medias)}"
)

log.debug(
f"[bold]Paid Media for all models[/bold] {sum(map(lambda x:len(x['medias']),output.values()))}"
Expand Down
55 changes: 33 additions & 22 deletions ofscraper/api/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
\____/|__| /____ >\___ >__| (____ /\____/ \___ >__|
\/ \/ \/ \/
"""

import asyncio
import contextvars
import logging
Expand All @@ -25,9 +26,9 @@
import ofscraper.utils.cache as cache
import ofscraper.utils.constants as constants
import ofscraper.utils.progress as progress_utils
import ofscraper.utils.settings as settings
from ofscraper.classes.semaphoreDelayed import semaphoreDelayed
from ofscraper.utils.context.run_async import run
import ofscraper.utils.settings as settings

log = logging.getLogger("shared")
attempt = contextvars.ContextVar("attempt")
Expand Down Expand Up @@ -61,7 +62,7 @@ async def get_archived_posts_progress(model_id, username, forced_after=None, c=N
oldarchived = list(filter(lambda x: x != None, oldarchived))
after = await get_after(model_id, username, forced_after)
splitArrays = get_split_array(oldarchived, username, after)
tasks=get_tasks(splitArrays, c, model_id, job_progress, after)
tasks = get_tasks(splitArrays, c, model_id, job_progress, after)

page_task = overall_progress.add_task(
f"Archived Content Pages Progress: {page_count}", visible=True
Expand All @@ -70,7 +71,7 @@ async def get_archived_posts_progress(model_id, username, forced_after=None, c=N
new_tasks = []
try:
async with asyncio.timeout(
constants.getattr("API_TIMEOUT_PER_TASKS") * max(len(tasks),2)
constants.getattr("API_TIMEOUT_PER_TASKS") * max(len(tasks), 2)
):
for task in asyncio.as_completed(tasks):
try:
Expand All @@ -96,7 +97,11 @@ async def get_archived_posts_progress(model_id, username, forced_after=None, c=N
progress_utils.archived_layout.visible = False

seen = set()
new_posts = [post for post in responseArray if post["id"] not in seen and not seen.add(post["id"])]
new_posts = [
post
for post in responseArray
if post["id"] not in seen and not seen.add(post["id"])
]

log.trace(f"archive postids {list(map(lambda x:x.get('id'),new_posts))}")
log.trace(
Expand All @@ -111,7 +116,6 @@ async def get_archived_posts_progress(model_id, username, forced_after=None, c=N
return new_posts



@run
async def get_archived_posts(model_id, username, forced_after=None, c=None):
tasks = []
Expand All @@ -124,7 +128,7 @@ async def get_archived_posts(model_id, username, forced_after=None, c=None):
if not read_args.retriveArgs().no_cache
else []
)
job_progress=None
job_progress = None

log.trace(
"oldarchive {posts}".format(
Expand All @@ -136,14 +140,13 @@ async def get_archived_posts(model_id, username, forced_after=None, c=None):
oldarchived = list(filter(lambda x: x != None, oldarchived))
after = await get_after(model_id, username, forced_after)
splitArrays = get_split_array(oldarchived, username, after)
tasks=get_tasks(splitArrays, c, model_id, job_progress, after)

tasks = get_tasks(splitArrays, c, model_id, job_progress, after)

while bool(tasks):
new_tasks = []
try:
async with asyncio.timeout(
constants.getattr("API_TIMEOUT_PER_TASKS") * max(len(tasks),2)
constants.getattr("API_TIMEOUT_PER_TASKS") * max(len(tasks), 2)
):
for task in asyncio.as_completed(tasks):
try:
Expand All @@ -163,7 +166,11 @@ async def get_archived_posts(model_id, username, forced_after=None, c=None):
tasks = new_tasks

seen = set()
new_posts = [post for post in responseArray if post["id"] not in seen and not seen.add(post["id"])]
new_posts = [
post
for post in responseArray
if post["id"] not in seen and not seen.add(post["id"])
]

log.trace(f"archive postids {list(map(lambda x:x.get('id'),new_posts))}")
log.trace(
Expand Down Expand Up @@ -230,7 +237,9 @@ def get_tasks(splitArrays, c, model_id, job_progress, after):
c,
model_id,
job_progress=job_progress,
required_ids=set([ele.get("created_at") for ele in splitArrays[i]]),
required_ids=set(
[ele.get("created_at") for ele in splitArrays[i]]
),
timestamp=splitArrays[i - 1][-1].get("created_at"),
offset=False,
)
Expand Down Expand Up @@ -278,8 +287,12 @@ def get_tasks(splitArrays, c, model_id, job_progress, after):
def set_check(unduped, model_id, after):
if not after:
seen = set()
new_posts = [post for post in cache.get(f"archived_check_{model_id}", default=[]) +unduped if post["id"] not in seen and not seen.add(post["id"])]

new_posts = [
post
for post in cache.get(f"archived_check_{model_id}", default=[]) + unduped
if post["id"] not in seen and not seen.add(post["id"])
]

cache.set(
f"archived_check_{model_id}",
new_posts,
Expand All @@ -291,16 +304,14 @@ def set_check(unduped, model_id, after):
async def get_after(model_id, username, forced_after=None):
if forced_after != None:
return forced_after
elif not settings.get_after_enabled():
elif not settings.get_after_enabled():
return 0
elif read_args.retriveArgs().after == 0:
return 0
elif read_args.retriveArgs().after:
return read_args.retriveArgs().after.float_timestamp

elif (
cache.get(f"{model_id}_full_archived_scrape")
):
elif cache.get(f"{model_id}_full_archived_scrape"):
log.info(
"Used --after previously. Scraping all archived posts required to make sure content is not missing"
)
Expand All @@ -310,10 +321,12 @@ async def get_after(model_id, username, forced_after=None):
log.debug("Setting date to zero because database is empty")
return 0
missing_items = list(filter(lambda x: x.get("downloaded") != 1, curr))
missing_items = list(sorted(missing_items, key=lambda x: x.get('posted_at') or 0))
missing_items = list(sorted(missing_items, key=lambda x: x.get("posted_at") or 0))
if len(missing_items) == 0:
log.debug("Using last db date because,all downloads in db marked as downloaded")
return await operations.get_last_archived_date(model_id=model_id, username=username)
return await operations.get_last_archived_date(
model_id=model_id, username=username
)
else:
log.debug(
f"Setting date slightly before earliest missing item\nbecause {len(missing_items)} posts in db are marked as undownloaded"
Expand Down Expand Up @@ -446,7 +459,5 @@ async def scrape_archived_posts(

finally:
sem.release()
job_progress.remove_task(
task
) if job_progress and task else None
job_progress.remove_task(task) if job_progress and task else None
return posts, new_tasks
Loading

0 comments on commit 31bab26

Please sign in to comment.