diff --git a/ofscraper/download/downloadbatch.py b/ofscraper/download/downloadbatch.py index 44864cdfa..cf1fb459c 100644 --- a/ofscraper/download/downloadbatch.py +++ b/ofscraper/download/downloadbatch.py @@ -375,11 +375,11 @@ async def ajob_progress_helper(funct): except Exception as E: logging.getLogger("shared").debug(E) -async def consumer(queue): +async def consumer(lock,aws): while True: - data = await queue.get() + async with lock: + data = aws.pop() if data is None: - queue.task_done() break else: try: @@ -393,18 +393,9 @@ async def consumer(queue): media_type = "skipped" num_bytes_downloaded = 0 await send_msg((media_type, num_bytes_downloaded, 0)) - queue.task_done() await asyncio.sleep(1) -async def producer(queue, aws, concurrency_limit): - for data in aws: - await queue.put(data) - for _ in range(concurrency_limit): - await queue.put(None) - await queue.join() # Wait for all tasks to finish - - @run async def process_dicts_split(username, model_id, medialist): common_globals.log.debug(f"{pid_log_helper()} start inner thread for other loggers") @@ -426,11 +417,11 @@ async def process_dicts_split(username, model_id, medialist): for ele in medialist: aws.append((c, ele, model_id, username)) concurrency_limit = get_max_workers() - queue = asyncio.Queue(maxsize=concurrency_limit) + lock = asyncio.Lock() consumers = [ - asyncio.create_task(consumer(queue)) for _ in range(concurrency_limit) + asyncio.create_task(consumer(lock,aws)) for _ in range(concurrency_limit) ] - await producer(queue, aws, concurrency_limit) + await asyncio.gather(*consumers) common_globals.log.debug(f"{pid_log_helper()} download process thread closing") # send message directly diff --git a/ofscraper/filters/media/main.py b/ofscraper/filters/media/main.py index 78cd8dda4..49251c084 100644 --- a/ofscraper/filters/media/main.py +++ b/ofscraper/filters/media/main.py @@ -16,6 +16,8 @@ def filtermediaFinal(media, username, model_id): if "download" not in actions and not scrape_paid: log.debug("Skipping filtering because download not in actions") return media + log.info(f"finalizing media filtering {username} {model_id} for downlaod") + count = 1 trace_log_media(count, media, "initial media no filter:") @@ -48,12 +50,13 @@ def filtermediaFinal(media, username, model_id): def filtermediaAreas(media, **kwargs): + actions = read_args.retriveArgs().action scrape_paid = read_args.retriveArgs().scrape_paid if "download" not in actions and not scrape_paid: log.debug("Skipping filtering because download not in actions") return media - + log.info("Initial media filtering for download") count = 1 trace_log_media(count, media, "initial media no filter:") @@ -128,9 +131,12 @@ def filterPostFinal(post): if "download" not in actions and not scrape_paid: log.debug("Skipping filtering because download not in actions") return post + if "Text" not in settings.get_mediatypes(): log.info("Skipping filtering Text not in mediatypes") return post + log.info("Filtering posts fot text") + count = 1 trace_log_post(count, post, "initial posts no filter:") log.debug(f"filter {count}-> initial posts no filter count: {len(post)}") @@ -171,10 +177,12 @@ def filterPostFinal(post): def post_filter_for_like(post, like=False): + actions = read_args.retriveArgs().action if "like" not in actions and "unlike" not in actions: log.debug("Skipping filtering because like and unlike not in actions") return post + log.info("Filtering posts for like action") log.debug(f"initial number of posts for {actions}") post = helpers.temp_post_filter(post)