diff --git a/ofscraper/classes/sessionmanager/chunk.py b/ofscraper/classes/sessionmanager/chunk.py new file mode 100644 index 000000000..1cf710296 --- /dev/null +++ b/ofscraper/classes/sessionmanager/chunk.py @@ -0,0 +1,14 @@ +class responseChunk: + def __init__(self, response,total): + self._response = response + self._total=total + self._count=0 + + + async def read_chunk(self,chunk_size): + r=self._response + if r.eof(): + return None + chunk=await r.read_(chunk_size) + return chunk + diff --git a/ofscraper/classes/sessionmanager/sessionmanager.py b/ofscraper/classes/sessionmanager/sessionmanager.py index 67567c6e8..50500e398 100644 --- a/ofscraper/classes/sessionmanager/sessionmanager.py +++ b/ofscraper/classes/sessionmanager/sessionmanager.py @@ -593,6 +593,7 @@ async def _aio_funct(self, method, *args, **kwargs): r.request = r.request_info r.status_code = r.status r.read_ = r.content.read + r.eof=r.content.at_eof return r async def factoryasync(self, input): diff --git a/ofscraper/const/values/download.py b/ofscraper/const/values/download.py index 093f21603..35bf52d08 100644 --- a/ofscraper/const/values/download.py +++ b/ofscraper/const/values/download.py @@ -16,5 +16,7 @@ MPD_FORCE_KEY = True API_FORCE_KEY = False PROFILE_FORCE_KEY = False -MULTIPROGRESS_JOB_UPDATE_FREQ=3 -OVERALL_MULTI_PROGRESS_THREAD_SLEEP=.2 \ No newline at end of file +MULTIPROGRESS_JOB_UPDATE_FREQ=2 +PROGRESS_JOB_UPDATE_FREQ=5 +OVERALL_MULTI_PROGRESS_THREAD_SLEEP=.1 +JOB_MULTI_PROGRESS_THREAD_SLEEP=.1 \ No newline at end of file diff --git a/ofscraper/const/values/req/req.py b/ofscraper/const/values/req/req.py index b32e931f0..fc8b356d4 100644 --- a/ofscraper/const/values/req/req.py +++ b/ofscraper/const/values/req/req.py @@ -13,6 +13,7 @@ MAX_CHUNK_SIZE = 1024 * 1024 * 10 MIN_CHUNK_SIZE = 4 * 1024 CHUNK_UPDATE_COUNT = 12 +CHUNK_SIZE_UPDATE_COUNT=15 REQ_SEMAPHORE_MULTI = 5 SCRAPE_PAID_SEMS = 10 @@ -49,5 +50,5 @@ # ideal chunk -CHUNK_MEMORY_SPLIT = 64 -CHUNK_FILE_SPLIT = 64 +CHUNK_MEMORY_SPLIT = 128 +CHUNK_FILE_SPLIT = 128 diff --git a/ofscraper/download/alt_download.py b/ofscraper/download/alt_download.py index 1563ddfa3..73f50135e 100644 --- a/ofscraper/download/alt_download.py +++ b/ofscraper/download/alt_download.py @@ -50,8 +50,8 @@ temp_file_logger, ) from ofscraper.download.utils.progress.chunk import ( - get_ideal_chunk_size, get_update_count, + get_ideal_chunk_size ) from ofscraper.download.utils.retries import get_download_retries from ofscraper.download.utils.send.chunk import send_chunk_msg @@ -233,12 +233,12 @@ async def download_fileobject_writer(total, l, ele, placeholderObj): fileobject = await aiofiles.open(placeholderObj.tempfilepath, "ab").__aenter__() download_sleep = constants.getattr("DOWNLOAD_SLEEP") chunk_size = get_ideal_chunk_size(total, placeholderObj.tempfilepath) - update_count = get_update_count(total, placeholderObj.tempfilepath, chunk_size) + update_count = get_update_count(total, placeholderObj.tempfilepath,chunk_size) count = 1 try: async for chunk in l.iter_chunked(chunk_size): - send_chunk_msg(ele, total, placeholderObj) await fileobject.write(chunk) + send_chunk_msg(ele, total, placeholderObj) await send_bar_msg( partial( progress_updater.update_download_job_task, diff --git a/ofscraper/download/alt_downloadbatch.py b/ofscraper/download/alt_downloadbatch.py index 29145d186..f1f787d93 100644 --- a/ofscraper/download/alt_downloadbatch.py +++ b/ofscraper/download/alt_downloadbatch.py @@ -38,7 +38,7 @@ ) from ofscraper.download.utils.progress.chunk import ( get_ideal_chunk_size, - get_update_count, + get_update_count ) from ofscraper.download.utils.resume import get_resume_header, get_resume_size from ofscraper.download.utils.retries import get_download_retries @@ -259,7 +259,7 @@ async def download_fileobject_writer(total, req, ele, placeholderObj): ) ) chunk_size = get_ideal_chunk_size(total, placeholderObj.tempfilepath) - update_count = get_update_count(total, placeholderObj.tempfilepath, chunk_size) + update_count = get_update_count(total, placeholderObj.tempfilepath,chunk_size) async for chunk in req.iter_chunked(chunk_size): await fileobject.write(chunk) diff --git a/ofscraper/download/downloadbatch.py b/ofscraper/download/downloadbatch.py index 9c2106d07..7c6eb5425 100644 --- a/ofscraper/download/downloadbatch.py +++ b/ofscraper/download/downloadbatch.py @@ -43,10 +43,9 @@ from ofscraper.download.utils.metadata import metadata from ofscraper.download.utils.paths.paths import addGlobalDir, setDirectoriesDate from ofscraper.download.utils.progress.progress import convert_num_bytes -from ofscraper.download.utils.send.message import send_msg +from ofscraper.download.utils.send.message import send_msg,send_msg_alt from ofscraper.download.utils.workers import get_max_workers from ofscraper.utils.context.run_async import run -from ofscraper.utils.live.progress import multi_download_job_progress import ofscraper.utils.constants as constants @@ -73,6 +72,7 @@ def process_dicts(username, model_id, filtered_medialist): split_val = min(4, num_proc) log.debug(f"Number of download threads: {num_proc}") connect_tuples = [AioPipe() for _ in range(num_proc)] + alt_connect_tuples = [AioPipe() for _ in range(num_proc)] shared = list( more_itertools.chunked([i for i in range(num_proc)], split_val) ) @@ -101,6 +101,7 @@ def process_dicts(username, model_id, filtered_medialist): logqueues_[i // split_val], otherqueues_[i // split_val], connect_tuples[i][1], + alt_connect_tuples[i][1], dates.getLogDate(), selector.get_ALL_SUBS_DICT(), read_args.retriveArgs(), @@ -138,7 +139,18 @@ def process_dicts(username, model_id, filtered_medialist): ) for i in range(num_proc) ] + download_queue_threads = [ + threading.Thread( + target=asyncio.run, + args=(download_progress( + alt_connect_tuples[i][0], + ),), + daemon=True, + ) + for i in range(num_proc) + ] [thread.start() for thread in queue_threads] + [thread.start() for thread in download_queue_threads] log.debug(f"Initial Queue Threads: {queue_threads}") log.debug(f"Number of initial Queue Threads: {len(queue_threads)}") @@ -155,6 +167,19 @@ def process_dicts(username, model_id, filtered_medialist): for thread in queue_threads: thread.join(timeout=0.1) time.sleep(0.5) + while True: + newqueue_threads = list( + filter(lambda x: x and x.is_alive(), download_queue_threads) + ) + if len(newqueue_threads) != len(queue_threads): + log.debug(f"Remaining Download Queue Threads: {download_queue_threads}") + log.debug(f"Number of Download Queue Threads: {len(download_queue_threads)}") + if len(download_queue_threads) == 0: + break + download_queue_threads = newqueue_threads + for thread in download_queue_threads: + thread.join(timeout=0.1) + time.sleep(0.5) log.debug(f"Intial Log Threads: {log_threads}") log.debug(f"Number of intial Log Threads: {len(log_threads)}") while True: @@ -212,6 +237,20 @@ def process_dicts(username, model_id, filtered_medialist): +async def download_progress(pipe_): + count = 0 + # shared globals + sleep = constants.getattr("JOB_MULTI_PROGRESS_THREAD_SLEEP") + while True: + time.sleep(sleep) + if count == 1: + break + results = pipe_.recv() + if not isinstance(results, list): + results = [results] + for result in results: + await ajob_progress_helper(result) + def queue_process(pipe_, task1, total): count = 0 @@ -300,6 +339,7 @@ def process_dict_starter( p_logqueue_, p_otherqueue_, pipe_, + pipe2_, dateDict, userNameList, argsCopy, @@ -308,6 +348,7 @@ def process_dict_starter( dateDict, userNameList, pipe_, + pipe2_, logger.get_shared_logger( main_=p_logqueue_, other_=p_otherqueue_, name=f"shared_{os.getpid()}" ), @@ -336,10 +377,25 @@ def process_dict_starter( def job_progress_helper(funct): try: funct() + #probably handle by other thread + except KeyError: + pass except Exception as E: logging.getLogger("shared").debug(E) +async def ajob_progress_helper(funct): + try: + await asyncio.get_event_loop().run_in_executor( + None, + funct, + ) + #probably handle by other thread + except KeyError: + pass + except Exception as E: + logging.getLogger("shared").debug(E) + def setpriority(): os_used = platform.system() process = psutil.Process( @@ -422,6 +478,7 @@ async def process_dicts_split(username, model_id, medialist): common_globals.log.debug("other thread closed") await send_msg({"dir_update": common_globals.localDirSet}) await send_msg(None) + await send_msg_alt(None) def pid_log_helper(): diff --git a/ofscraper/download/main_download.py b/ofscraper/download/main_download.py index 183c68e49..a35c4146f 100644 --- a/ofscraper/download/main_download.py +++ b/ofscraper/download/main_download.py @@ -43,8 +43,8 @@ ) from ofscraper.download.utils.metadata import force_download from ofscraper.download.utils.progress.chunk import ( - get_ideal_chunk_size, get_update_count, + get_ideal_chunk_size ) from ofscraper.download.utils.resume import get_resume_header, get_resume_size from ofscraper.download.utils.retries import get_download_retries @@ -210,7 +210,7 @@ async def download_fileobject_writer(r, ele, tempholderObj, placeholderObj, tota fileobject = await aiofiles.open(tempholderObj.tempfilepath, "ab").__aenter__() download_sleep = constants.getattr("DOWNLOAD_SLEEP") chunk_size = get_ideal_chunk_size(total, tempholderObj.tempfilepath) - update_count = get_update_count(total, tempholderObj.tempfilepath, chunk_size) + update_count = get_update_count(total, tempholderObj.tempfilepath,chunk_size) count = 1 async for chunk in r.iter_chunked(chunk_size): await fileobject.write(chunk) diff --git a/ofscraper/download/main_downloadbatch.py b/ofscraper/download/main_downloadbatch.py index b1b44644b..18e8878f3 100644 --- a/ofscraper/download/main_downloadbatch.py +++ b/ofscraper/download/main_downloadbatch.py @@ -44,8 +44,9 @@ ) from ofscraper.download.utils.metadata import force_download from ofscraper.download.utils.progress.chunk import ( - get_ideal_chunk_size, + change_get_ideal_chunk_size, get_update_count, + get_ideal_chunk_size, ) from ofscraper.download.utils.resume import get_resume_header, get_resume_size from ofscraper.download.utils.retries import get_download_retries @@ -233,26 +234,28 @@ async def download_fileobject_writer(r, ele, total, tempholderObj, placeholderOb partial(progress_updater.update_download_multi_job_task, ele.id, visible=True) ) chunk_size = get_ideal_chunk_size(total, tempholderObj.tempfilepath) - update_count = get_update_count(total, tempholderObj.tempfilepath, chunk_size) + update_count = get_update_count(total, tempholderObj.tempfilepath,chunk_size) count = 1 - async for chunk in r.iter_chunked(chunk_size): - send_chunk_msg(ele, total, tempholderObj) - await fileobject.write(chunk) - await send_bar_msg_batch( - partial( - progress_updater.update_download_multi_job_task, - ele.id, - completed=pathlib.Path(tempholderObj.tempfilepath) - .absolute() - .stat() - .st_size, - ), - count, - update_count, - ) - count += 1 - (await asyncio.sleep(download_sleep)) if download_sleep else None + try: + await fileobject.write(chunk) + send_chunk_msg(ele, total, tempholderObj) + await send_bar_msg_batch( + partial( + progress_updater.update_download_multi_job_task, + ele.id, + completed=pathlib.Path(tempholderObj.tempfilepath) + .absolute() + .stat() + .st_size, + ), + count, + update_count, + ) + count += 1 + (await asyncio.sleep(download_sleep)) if download_sleep else None + except EOFError: + break except Exception as E: # reset download data raise E diff --git a/ofscraper/download/utils/alt/data.py b/ofscraper/download/utils/alt/data.py index 5431956f0..52a994e84 100644 --- a/ofscraper/download/utils/alt/data.py +++ b/ofscraper/download/utils/alt/data.py @@ -41,7 +41,7 @@ async def resume_data_handler_alt(data, item, ele, placeholderObj, batch=False): elif not batch: total_change_helper(None, total) elif batch: - batch_total_change_helper(None, total) + await batch_total_change_helper(None, total) return item, True elif total != resume_size: return item, False diff --git a/ofscraper/download/utils/general.py b/ofscraper/download/utils/general.py index 90c6b3953..199e93e44 100644 --- a/ofscraper/download/utils/general.py +++ b/ofscraper/download/utils/general.py @@ -37,12 +37,12 @@ def add_additional_data(placeholderObj, ele): add_path(placeholderObj, ele) -def subProcessVariableInit(dateDict, userList, pipeCopy, logCopy, argsCopy): +def subProcessVariableInit(dateDict, userList, pipeCopy, pipeAltCopy, logCopy, argsCopy): common_globals.reset_globals() write_args.setArgs(argsCopy) dates.setLogDate(dateDict) selector.set_ALL_SUBS_DICT(userList) - common_globals.process_split_globals(pipeCopy, logCopy) + common_globals.process_split_globals(pipeCopy, pipeAltCopy,logCopy) set_send_msg() diff --git a/ofscraper/download/utils/globals.py b/ofscraper/download/utils/globals.py index dc81cafd1..1e9b41fcd 100644 --- a/ofscraper/download/utils/globals.py +++ b/ofscraper/download/utils/globals.py @@ -73,12 +73,16 @@ def set_up_contexvars(): innerlog = contextvars.ContextVar("innerlog") -def process_split_globals(pipeCopy, logCopy): +def process_split_globals(pipeCopy, pipeAltCopy,logCopy): global pipe global log global pipe_lock + global pipe_alt_lock global lock_pool + global pipe_alt pipe = pipeCopy + pipe_alt=pipeAltCopy log = logCopy pipe_lock = threading.Lock() + pipe_alt_lock = threading.Lock() lock_pool = ThreadPoolExecutor(max_workers=1) diff --git a/ofscraper/download/utils/main/data.py b/ofscraper/download/utils/main/data.py index 25c482b44..2c1b32fda 100644 --- a/ofscraper/download/utils/main/data.py +++ b/ofscraper/download/utils/main/data.py @@ -52,6 +52,6 @@ async def resume_data_handler_main(data, ele, tempholderObj, batch=False): elif not batch: total_change_helper(None, total) elif batch: - batch_total_change_helper(None, total) + await batch_total_change_helper(None, total) check = True return total, placeholderObj, check diff --git a/ofscraper/download/utils/progress/chunk.py b/ofscraper/download/utils/progress/chunk.py index 307ccecc9..ba2107aad 100644 --- a/ofscraper/download/utils/progress/chunk.py +++ b/ofscraper/download/utils/progress/chunk.py @@ -17,6 +17,12 @@ import ofscraper.utils.constants as constants +def change_get_ideal_chunk_size(total_size, curr_file,count=None,chunk_size=None,update_count=None): + if not chunk_size: + chunk_size = get_ideal_chunk_size(total_size, curr_file) + elif count%(update_count*5)==0: + chunk_size = get_ideal_chunk_size(total_size, curr_file) + return chunk_size def get_ideal_chunk_size(total_size, curr_file): """ @@ -42,10 +48,10 @@ def get_ideal_chunk_size(total_size, curr_file): # Target a chunk size that utilizes a reasonable portion of available memory max_chunk_size = min( - available_memory // 512, constants.getattr("MAX_CHUNK_SIZE") + available_memory // constants.getattr("CHUNK_MEMORY_SPLIT"), constants.getattr("MAX_CHUNK_SIZE") ) # Max 10MB # Adjust chunk size based on file size (consider smaller sizes for larger files, with minimum) - ideal_chunk_size = min(max_chunk_size, file_size // 512) + ideal_chunk_size = min(max_chunk_size, file_size // constants.getattr("CHUNK_FILE_SPLIT")) ideal_chunk_size = max( ideal_chunk_size - (ideal_chunk_size % 4096), constants.getattr("MIN_CHUNK_SIZE"), diff --git a/ofscraper/download/utils/send/message.py b/ofscraper/download/utils/send/message.py index bfc5276fd..6cdefee85 100644 --- a/ofscraper/download/utils/send/message.py +++ b/ofscraper/download/utils/send/message.py @@ -4,18 +4,28 @@ import ofscraper.download.utils.globals as common_globals + def set_send_msg(): global send_msg_helper + global send_msg_alt_helper + if platform.system() != "Windows": send_msg_helper = send_msg_unix + send_msg_alt_helper = send_msg_alt_unix else: send_msg_helper = send_msg_win + send_msg_alt_helper = send_msg_alt_win + async def send_msg(msg): global send_msg_helper await send_msg_helper(msg) +async def send_msg_alt(msg): + global send_msg_helper + await send_msg_alt_helper(msg) + async def send_msg_win(msg): loop = asyncio.get_event_loop() @@ -27,6 +37,19 @@ async def send_msg_win(msg): finally: common_globals.pipe_lock.release() +async def send_msg_alt_win(msg): + loop = asyncio.get_event_loop() + await loop.run_in_executor( + common_globals.lock_pool, common_globals.pipe_alt_lock.acquire + ) + try: + await common_globals.pipe_alt.coro_send(msg) + finally: + common_globals.pipe_alt_lock.release() async def send_msg_unix(msg): await common_globals.pipe.coro_send(msg) + + +async def send_msg_alt_unix(msg): + await common_globals.pipe_alt.coro_send(msg) diff --git a/ofscraper/download/utils/send/send_bar_msg.py b/ofscraper/download/utils/send/send_bar_msg.py index 7c77c5b13..758b81c08 100644 --- a/ofscraper/download/utils/send/send_bar_msg.py +++ b/ofscraper/download/utils/send/send_bar_msg.py @@ -1,21 +1,12 @@ import asyncio import ofscraper.download.utils.globals as common_globals -import ofscraper.utils.settings as settings -from ofscraper.download.utils.send.message import send_msg +from ofscraper.download.utils.send.message import send_msg_alt async def send_bar_msg_batch(msg, count, update_count): - if count % update_count != 0: - return - elif not settings.get_download_bars(): - return - await send_msg(msg) + await send_msg_alt(msg) async def send_bar_msg(func, count, update_count): - if count % update_count != 0: - return - elif not settings.get_download_bars(): - return await asyncio.get_event_loop().run_in_executor(common_globals.thread, func) diff --git a/ofscraper/utils/live/updater.py b/ofscraper/utils/live/updater.py index 5ae24963c..1c2104ace 100644 --- a/ofscraper/utils/live/updater.py +++ b/ofscraper/utils/live/updater.py @@ -144,7 +144,7 @@ def add_download_job_multi_task(*args, file=None, **kwargs): settings.get_download_bars() and len(download_job_progress.tasks) < max_visible ) task = multi_download_job_progress.add_task( - *args, visible=visible, start=visible, file=file, **kwargs + *args, visible=visible, start=True, file=file, **kwargs ) if not visible: downloads_pending.add(task) @@ -152,18 +152,14 @@ def add_download_job_multi_task(*args, file=None, **kwargs): def add_download_task(*args, **kwargs): - return download_overall_progress.add_task(*args, **kwargs) + return download_overall_progress.add_task(*args, start=True,**kwargs) def start_download_job_task(*args, **kwargs): - if not settings.get_download_bars(): - return download_job_progress.start(*args, **kwargs) def start_download_multi_job_task(*args, **kwargs): - if not settings.get_download_bars(): - return multi_download_job_progress.start(*args, **kwargs) @@ -178,8 +174,6 @@ def update_download_job_task(*args, **kwargs): def update_download_multi_job_task(*args, **kwargs): - if not settings.get_download_bars(): - return multi_download_job_progress.update(*args, **kwargs)