Skip to content

Commit

Permalink
crawl mode
Browse files Browse the repository at this point in the history
  • Loading branch information
yzqzss committed Dec 3, 2024
1 parent b8dc434 commit d425391
Show file tree
Hide file tree
Showing 4 changed files with 289 additions and 72 deletions.
89 changes: 86 additions & 3 deletions pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ dependencies = [
"urlextract>=1.9.0",
"tqdm>=4.66.4",
"Pygments>=2.18.0",
"httpx>=0.27.0",
]
requires-python = ">=3.12"
readme = "README.md"
Expand Down
184 changes: 115 additions & 69 deletions src/googlrot/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorCollection

from pymongo.server_api import ServerApi
from googlrot.mode.code import code_mode
from googlrot.mode.repo import repo_mode
from googlrot.util.utils import POISON, urls_pusher

Expand All @@ -28,7 +29,7 @@ async def create_url_index(collection: AsyncIOMotorCollection):

def argparser():
parser = argparse.ArgumentParser()
parser.add_argument("--mode", choices=["repo", "code", "code_task_gen"], default="code")
parser.add_argument("--mode", choices=["repo", "code", "code_task_gen", "crawl"], default="crawl")
return parser.parse_args()

async def main():
Expand Down Expand Up @@ -73,73 +74,118 @@ def generate_prefixes():
print(len(prefixes))
await googl_perfix_queue_collection.insert_many(prefixes[:1000])
prefixes = prefixes[1000:]

async def code_mode(g: Github, googl_perfix_queue_collection: AsyncIOMotorCollection, googl_urls_collection: AsyncIOMotorCollection, bad_urls_collection: AsyncIOMotorCollection):
# for snippet in tqdm(g.search_code("goo.gl/uhr26v -is:archived -language:html AND NOT .goo.gl AND NOT /goo.gl\\/[^0-9a-zA-Z]/")):
from urlextract import URLExtract
from googlrot.url_type import BasicGooGlURL
extractor = URLExtract(extract_localhost=False)

googl_urls_queue = asyncio.Queue(maxsize=1000)
bad_urls_queue = asyncio.Queue(maxsize=1000)

task = await googl_perfix_queue_collection.find_one_and_update({"status": "TODO"}, {"$set": {"status": "PROCESSING"}})
if task is None:
logger.info("No more tasks")
return

prefix:str = task["prefix"]
assert len(prefix) == 3

logger.info(f"Processing prefix {prefix}")
OK_urls_count = 0
BAD_urls_count = 0
async with asyncio.TaskGroup() as group:
group.create_task(urls_pusher(urls_collection=googl_urls_collection, urls_queue=googl_urls_queue))
group.create_task(urls_pusher(urls_collection=bad_urls_collection, urls_queue=bad_urls_queue))

# for result in g.search_code(f"goo.gl/{prefix} -is:archived AND NOT .goo.gl AND NOT /goo.gl\\/[^0-9a-zA-Z]/"):
for result in g.search_code(f"goo.gl/{prefix} AND NOT is:fork"):
logger.info(f"Processing {result.repository.full_name} ==")

try:
content = result.decoded_content.decode("utf-8")
except GithubUnknownObjectException as e:
if e.status == 404:
logger.error(f"404: {e}, skip")
continue
else:
raise e
except UnicodeDecodeError:
logger.error(f"UnicodeDecodeError: {result.repository.full_name}, skip")
continue
print("conetnt: ", content[:256])
for url in extractor.gen_urls(content):
assert isinstance(url, str)
if "goo.gl/" not in url:
logger.debug(f"skipping url {url}")
continue
try:
stdurl = BasicGooGlURL(url)
await googl_urls_queue.put(stdurl)
logger.info(f"OK: {url} -> {stdurl}")
OK_urls_count += 1
except Exception as e:
logger.error(f"Error: {e} -> {url}")
await bad_urls_queue.put(url)
BAD_urls_count += 1

await googl_urls_queue.put(POISON)
await bad_urls_queue.put(POISON)

await googl_urls_queue.join()
await bad_urls_queue.join()

logger.info(f"OK: {OK_urls_count}, BAD: {BAD_urls_count}")

# set task to DONE
await googl_perfix_queue_collection.find_one_and_update({"prefix": prefix}, {"$set": {"status": "DONE"}})
logger.info(f"Finished prefix {prefix} -> DONE")
elif mode == "crawl":
import httpx
import re
from tqdm import tqdm
import time
import sqlite3

def init_db():
conn = sqlite3.connect("googl_urls.db")
c = conn.cursor()
# url, status, redirect_url
c.execute("CREATE TABLE IF NOT EXISTS urls (url TEXT PRIMARY KEY, status TEXT, redirect_url TEXT)")
conn.commit()
return conn
def get_crawled(conn: sqlite3.Connection)->set:
c = conn.cursor()
c.execute("SELECT url FROM urls")
return set([row[0] for row in c.fetchall()])

def write_url(conn: sqlite3.Connection, url, status, redirect_url):
c = conn.cursor()
c.execute("INSERT INTO urls (url, status, redirect_url) VALUES (?, ?, ?)", (url, status, redirect_url))
print(f"{url} -> {status} -> {redirect_url[:64]}")

def ratelimited(response: httpx.Response) -> bool:
if 'Location' not in response.headers:
return False
result_url = response.headers['Location']
response.content # read the response to allow connection reuse
return not not re.search(r'^https?://(?:www\.)?google\.com/sorry/index\?continue=https://goo.gl/[^&]+&q=', result_url)
async def crawl():
conn = init_db()
googl_urls = []
with open("googl_urls.github.txt") as f:
for line in f:
googl_urls.append(line.strip())
googl_urls = set(googl_urls)
googl_urls.remove("") if "" in googl_urls else None
googl_urls.remove("https://goo.gl/") if "https://goo.gl/" in googl_urls else None

crawled = get_crawled(conn)
to_crawl = googl_urls - crawled

print(f"Total: {len(googl_urls)}, Crawled: {len(crawled)}, To crawl: {len(to_crawl)}")

# 302 is redirect, 404 is not found, 403 is banned, 302 to https://www.google.com/sorry/index* is rate limited, 200 is for deleted URLs

sess = httpx.AsyncClient(timeout=6)
sess.headers["User-Agent"] = "savethewebproject/googlrot (+github.com/saveweb)"

to_crawl = list(to_crawl)
to_crawl.sort()

to_crawl_queue = asyncio.Queue()
for url in to_crawl:
to_crawl_queue.put_nowait(url)

async def progress(conn: sqlite3.Connection):
start = time.time()
await asyncio.sleep(1)
while True:
now = time.time()
# eta = (left / speed)
left = to_crawl_queue.qsize()
speed = (len(to_crawl) - left) / (now - start)
eta = left / speed
print(f"Progress: {to_crawl_queue.qsize()}/{len(to_crawl)}, ETA: {eta:.2f}s")
await asyncio.sleep(1)
conn.commit()

async def crawl_worker():
while True:
url = await to_crawl_queue.get()
try:
r = await sess.get(url, follow_redirects=False)
r.content
if ratelimited(r):
print("Rate limited")
await asyncio.sleep(3)
continue
if r.status_code == 302:
write_url(conn, url, "302", r.headers["Location"])
elif r.status_code == 301:
write_url(conn, url, "301", r.headers["Location"])
elif r.status_code == 404:
write_url(conn, url, "404", "")
elif r.status_code == 403:
write_url(conn, url, "403", "")
elif r.status_code == 200:
write_url(conn, url, "200", "")
elif r.status_code == 400:
if "blocked" in r.text:
write_url(conn, url, "400", "blocked")
else:
print("")
print(url)
print(f"Unknown status code {r.status_code}")
print(r.headers)
print(r.text)
except Exception as e:
print(f"Error: {e}")
continue
finally:
to_crawl_queue.task_done()
async with asyncio.TaskGroup() as group:
group.create_task(progress(conn=conn))
for _ in range(50):
group.create_task(crawl_worker())
await to_crawl_queue.join()
conn.commit()

await crawl()

if __name__ == "__main__":
asyncio.run(main())
asyncio.run(main())
Loading

0 comments on commit d425391

Please sign in to comment.