Skip to content

Commit

Permalink
fix scrape paid and simplify undupe on timeline
Browse files Browse the repository at this point in the history
  • Loading branch information
datawhores committed Apr 2, 2024
1 parent 1cfb329 commit b890759
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 92 deletions.
32 changes: 16 additions & 16 deletions ofscraper/actions/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import ofscraper.utils.context.exit as exit
import ofscraper.utils.context.stdout as stdout
import ofscraper.utils.profiles.tools as profile_tools
from ofscraper.utils.context.run_async import run


log = logging.getLogger("shared")

Expand Down Expand Up @@ -75,7 +77,7 @@ def process_post():


@exit.exit_wrapper
async def process_post_user_first():
def process_post_user_first():
with scrape_context_manager():
if not placeholder.check_uniquename():
log.warning(
Expand All @@ -95,7 +97,7 @@ async def process_post_user_first():
# log.info(f"Data retrival progressing on model {count+1}/{length}")


async def process_user_first_helper(ele):
def process_user_first_helper(ele):
if constants.getattr("SHOW_AVATAR") and ele.avatar:
log.warning(f"Avatar : {ele.avatar}")
if bool(areas.get_download_area()):
Expand All @@ -116,27 +118,25 @@ async def process_user_first_helper(ele):


def scrape_paid(user_dict=None):
output = []
user_dict = user_dict or {}
output.extend(OF.process_all_paid())
user_dict = user_dict or {}
[
user_dict.update(
{ele.post.model_id: user_dict.get(ele.post.model_id, []) + [ele]}
)
for ele in output
]
user_dict =OF.process_all_paid()
oldUsers = selector.get_ALL_SUBS_DICT()
for value in user_dict.values():
model_id = value[0].post.model_id
username = value[0].post.username
length=len(list(user_dict.keys()))
for count,value in enumerate(user_dict.values()):
model_id = value["model_id"]
username = value["username"]
posts=value["posts"]
medias=value["medias"]
log.warning(
f"Download paid content for {model_id}_{username} number:{count+1}/{length} models "
)
selector.set_ALL_SUBS_DICTVManger(
{username: models.Model(profile.scrape_profile(model_id))}
)
download.download_process(
username,
model_id,
value,
medias,
posts=posts
)
# restore og users
selector.set_ALL_SUBS_DICT(oldUsers)
Expand Down
63 changes: 26 additions & 37 deletions ofscraper/actions/scraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
import traceback
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

from rich.live import Live

import ofscraper.api.archive as archive
import ofscraper.api.highlights as highlights
import ofscraper.api.labels as labels_api
Expand All @@ -36,12 +34,13 @@
import ofscraper.utils.args.areas as areas
import ofscraper.utils.args.read as read_args
import ofscraper.utils.cache as cache
import ofscraper.utils.console as console_
import ofscraper.utils.constants as constants
import ofscraper.utils.context.stdout as stdout
import ofscraper.utils.progress as progress_utils
import ofscraper.utils.system.free as free
import ofscraper.utils.system.system as system
from ofscraper.utils.context.run_async import run


log = logging.getLogger("shared")

Expand Down Expand Up @@ -378,28 +377,15 @@ async def process_profile(username, c) -> list:


@free.space_checker
@run
async def process_all_paid():
with stdout.lowstdout():
paid_content = paid.get_all_paid_posts()
paid_content = await paid.get_all_paid_posts()
user_dict = {}
post_array = []
[
user_dict.update(
{
(ele.get("fromUser", None) or ele.get("author", None) or {}).get(
"id"
): user_dict.get(
(
ele.get("fromUser", None) or ele.get("author", None) or {}
).get("id"),
[],
)
+ [ele]
}
)
for ele in paid_content
]
output = []
for ele in paid_content:
user_id = ele.get("fromUser", {}).get("id") or ele.get("author", {}).get("id")
user_dict.setdefault(user_id, []).append(ele)
output = {}
for model_id, value in user_dict.items():
username = profile.scrape_profile(model_id).get("username")
if username == "modeldeleted" and await operations.check_profile_table_exists(
Expand All @@ -410,36 +396,39 @@ async def process_all_paid():
or username
)
log.info(f"Processing {username}_{model_id}")
operations.table_init_create(model_id=model_id, username=username)
log.debug(f"Created table for {username}")
await operations.table_init_create(model_id=model_id, username=username)
log.debug(f"Created table for {username}_{model_id}")
all_posts = list(
map(
lambda x: posts_.Post(x, model_id, username, responsetype="paid"),
value,
)
)
new_dict = {}
for ele in all_posts:
new_dict[ele.id] = ele
new_posts = new_dict.values()
seen = set()
new_posts = [post for post in all_posts if post.id not in seen and not seen.add(post.id)]
new_medias=[item for post in new_posts for item in post.media]
new_medias=filters.filterMedia(new_medias)
new_posts=filters.filterPost(new_posts)
await operations.make_post_table_changes(
new_posts,
model_id=model_id,
username=username,
)
temp = []
[temp.extend(post.media) for post in new_posts]
output.extend(temp)
log.debug(
f"[bold]Paid media count {username}_{model_id}[/bold] {len(temp)}"
await operations.batch_mediainsert(
new_medias,
model_id=model_id,
username=username,
downloaded=False,
)
log.debug(f"Added Paid {len(temp)} media items from {username}_{model_id}")
post_array.extend(new_posts)

output[model_id]=dict(model_id=model_id,username=username,posts=new_posts,medias=new_medias)
log.debug(
f"[bold]Paid media count {username}_{model_id}[/bold] {len(new_medias)}")

log.debug(
f"[bold]Paid Media for all models[/bold] {sum(map(lambda x:len(x.media),post_array))}"
f"[bold]Paid Media for all models[/bold] {sum(map(lambda x:len(x['medias']),output.values()))}"
)
return filters.filterMedia(output)
return output


@free.space_checker
Expand Down
48 changes: 19 additions & 29 deletions ofscraper/api/timeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,25 +82,22 @@ async def get_timeline_media_progress(model_id, username, forced_after=None, c=N
overall_progress.remove_task(page_task)
progress_utils.timeline_layout.visible = False

unduped = {}
log.debug(f"[bold]Timeline Count with Dupes[/bold] {len(responseArray)} found")
for post in responseArray:
id = post["id"]
if unduped.get(id):
continue
unduped[id] = post
seen = set()
new_posts = [post for post in responseArray if post["id"] not in seen and not seen.add(post["id"])]

log.trace(f"timeline dupeset postids {list(unduped.keys())}")

log.trace(f"timeline dupeset postids {list(map(lambda x:x.get('id'),new_posts))}")
log.trace(
"post raw unduped {posts}".format(
posts="\n\n".join(
list(map(lambda x: f"undupedinfo timeline: {str(x)}", unduped))
list(map(lambda x: f"undupedinfo timeline: {str(x)}", new_posts))
)
)
)
log.debug(f"[bold]Timeline Count without Dupes[/bold] {len(unduped)} found")
set_check(unduped, model_id, after)
return list(unduped.values())
log.debug(f"[bold]Timeline Count without Dupes[/bold] {len(new_posts)} found")
set_check(new_posts, model_id, after)
return list(new_posts)


@run
Expand Down Expand Up @@ -129,7 +126,7 @@ async def get_timeline_media(model_id, username, forced_after=None, c=None):
oldtimeline = list(filter(lambda x: x != None, oldtimeline))
after = await get_after(model_id, username, forced_after)

splitArrays = get_split_array(model_id, username, after)
splitArrays = await get_split_array(model_id, username, after)
tasks = get_tasks(splitArrays, c, model_id, job_progress, after)

while bool(tasks):
Expand All @@ -154,25 +151,21 @@ async def get_timeline_media(model_id, username, forced_after=None, c=None):
log.traceback_(traceback.format_exc())
tasks = new_tasks

unduped = {}
log.debug(f"[bold]Timeline Count with Dupes[/bold] {len(responseArray)} found")
for post in responseArray:
id = post["id"]
if unduped.get(id):
continue
unduped[id] = post
seen = set()
new_posts = [post for post in responseArray if post["id"] not in seen and not seen.add(post["id"])]

log.trace(f"timeline dupeset postids {list(unduped.keys())}")
log.trace(f"timeline dupeset postids {list(map(lambda x:x.get('id'),new_posts))}")
log.trace(
"post raw unduped {posts}".format(
posts="\n\n".join(
list(map(lambda x: f"undupedinfo timeline: {str(x)}", unduped))
list(map(lambda x: f"undupedinfo timeline: {str(x)}",new_posts))
)
)
)
log.debug(f"[bold]Timeline Count without Dupes[/bold] {len(unduped)} found")
set_check(unduped, model_id, after)
return list(unduped.values())
log.debug(f"[bold]Timeline Count without Dupes[/bold] {len(new_posts)} found")
set_check(new_posts, model_id, after)
return list(new_posts)


async def get_split_array(model_id, username, after):
Expand Down Expand Up @@ -280,14 +273,11 @@ def get_tasks(splitArrays, c, model_id, job_progress, after):

def set_check(unduped, model_id, after):
if not after:
newCheck = {}
for post in cache.get(f"timeline_check_{model_id}", default=[]) + list(
unduped.values()
):
newCheck[post["id"]] = post
seen = set()
new_posts = [post for post in cache.get(f"timeline_check_{model_id}", default=[]) +unduped if post["id"] not in seen and not seen.add(post["id"])]
cache.set(
f"timeline_check_{model_id}",
list(newCheck.values()),
list(new_posts),
expire=constants.getattr("DAY_SECONDS"),
)
cache.close()
Expand Down
2 changes: 1 addition & 1 deletion ofscraper/commands/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ def get_all_found_media(user_name, posts):
async def get_downloaded(user_name, model_id, paid=False):
downloaded = {}

operations.table_init_create(model_id=model_id, username=user_name)
await operations.table_init_create(model_id=model_id, username=user_name)
paid = await get_paid_ids(model_id, user_name) if paid else []
[
downloaded.update({ele: downloaded.get(ele, 0) + 1})
Expand Down
15 changes: 6 additions & 9 deletions ofscraper/db/operations_/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
)
"""
profileDupeCheck = """
SELECT * FROM profiles where user_id=(?)
SELECT username FROM profiles where user_id=(?)
"""
profileTableCheck = """
SELECT name FROM sqlite_master WHERE type='table' AND name='profiles';
Expand Down Expand Up @@ -73,12 +73,11 @@ def get_profile_info(model_id=None, username=None, conn=None) -> list:
return None
with contextlib.closing(conn.cursor()) as cur:
try:
modelinfo = cur.execute(
profileDupeCheck, (model_id,)
).fetchall() or [(None,)]
conn.commit()
return modelinfo[0][-1]
except sqlite3.OperationalError as E:
cur.execute(
profileDupeCheck, ([model_id])
)
return (list(map(lambda x: x[0], cur.fetchall())) or [None] )[0]
except sqlite3.OperationalError:
None
except Exception as E:
raise E
Expand Down Expand Up @@ -116,9 +115,7 @@ def check_profile_table_exists(model_id=None, username=None, conn=None):
return False
with contextlib.closing(conn.cursor()) as cur:
if len(cur.execute(profileTableCheck).fetchall()) > 0:
conn.commit()
return True
conn.commit()
return False


Expand Down
2 changes: 2 additions & 0 deletions ofscraper/download/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import ofscraper.utils.settings as settings
import ofscraper.utils.system.system as system
from ofscraper.download.common.common import textDownloader
from ofscraper.utils.context.run_async import run



def medialist_filter(medialist, model_id, username):
Expand Down

0 comments on commit b890759

Please sign in to comment.