-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathoauth.py
91 lines (69 loc) · 2.75 KB
/
oauth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
from google_auth_oauthlib.flow import InstalledAppFlow
from google.oauth2.credentials import Credentials
from oauthlib.oauth2.rfc6749.errors import AccessDeniedError
from google.auth.transport.requests import Request
from google.auth.exceptions import RefreshError
import logging
import logging.handlers
###################### Logger configurations #############################
logger = logging.getLogger("oauth")
logger.setLevel(logging.DEBUG)
styleFormat = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s", datefmt="%d-%m-%Y %H:%M:%S")
fileHandler = logging.handlers.TimedRotatingFileHandler(
filename="logger.log", when="w5")
fileHandler.setFormatter(styleFormat)
fileHandler.setLevel(logging.DEBUG)
logger.addHandler(fileHandler)
########################## Main code ################################
def start_oauth():
flow = InstalledAppFlow.from_client_secrets_file(
'client_secret.json',
scopes=["https://www.googleapis.com/auth/youtube.readonly", "https://www.googleapis.com/auth/youtube.force-ssl"],)
logger.info("Start Oauth Process...")
try:
flow.run_local_server(prompt="consent")
except AccessDeniedError:
logger.info("Access Denied")
return None
try:
credentials = flow.credentials
except ValueError:
logger.info("There was no access token adquired")
logger.info("Run the script again for another try")
return None
logger.info("Tokens Acquired")
logger.info("Saving Tokens")
with open("tokens.json", "w") as f:
f.write(credentials.to_json())
logger.info("Credentiasl Saved")
return credentials
def get_credentials():
try:
logger.info("Loading saved tokens")
credentials = Credentials.from_authorized_user_file("tokens.json")
logger.info("Tokens acquired successfully")
except FileNotFoundError:
logger.info("There is no saved tokens")
return start_oauth()
logger.info("Checking tokens Validity")
if credentials.valid:
logger.info("Tokens are valid")
return credentials
if credentials.expired and credentials.refresh_token:
logger.info("Token is expired")
try:
logger.info("Refreshing tokens")
credentials.refresh(Request())
logger.info("Tokens refreshed successfully")
logger.info("Saving tokens")
with open("tokens.json", "w") as f:
f.write(credentials.to_json())
logger.info("Saved successfully")
return credentials
except RefreshError:
logger.info("Couldn't refresh tokens")
return None
return start_oauth()
if __name__ == '__main__':
start_oauth()