-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.py
132 lines (111 loc) · 4.29 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from http.server import BaseHTTPRequestHandler, HTTPServer
from time import sleep
from urllib.parse import urlparse
import time
import base64
import json
import picamera
import os
hostName = "0.0.0.0"
hostPort = 8080
def deleteFile(fileName):
# delete file if exists
try:
os.remove(fileName)
except OSError:
pass
class MyServer(BaseHTTPRequestHandler):
key = ''
def set_auth(self, username, password):
self.key = base64.b64encode(
bytes('%s:%s' % (username, password), 'utf-8')).decode('ascii')
def get_auth_key(self):
return self.key
def do_AUTHHEAD(self):
self.send_response(401)
self.send_header(
'WWW-Authenticate', 'Basic realm="Demo Realm"')
self.send_header('Content-type', 'application/json')
self.end_headers()
def do_GET(self):
key = self.server.get_auth_key()
''' Present frontpage with user authentication. '''
if self.headers.get('Authorization') == None:
self.do_AUTHHEAD()
response = {
'success': False,
'error': 'No auth header received'
}
self.wfile.write(bytes(json.dumps(response), 'utf-8'))
elif self.headers.get('Authorization') == 'Basic ' + str(key):
if self.path=="/":
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(bytes("<html><head><title>Title goes here.</title></head>", "utf-8"))
self.wfile.write(bytes("<body><p>This is a test.</p>", "utf-8"))
self.wfile.write(bytes("<p>You accessed path: %s</p>" % self.path, "utf-8"))
self.wfile.write(bytes("</body></html>", "utf-8"))
if self.path=="/video":
path_to_video='/home/pi/video.h264'
# delete file if exists
deleteFile(path_to_video)
camera.resolution = (640, 480)
camera.start_recording(path_to_video)
camera.wait_recording(5)
camera.stop_recording()
#Open the static file requested and send it
f = open(path_to_video, 'rb')
statinfo = os.stat(path_to_video)
file_size = statinfo.st_size
self.send_response(200)
self.send_header('Content-type', 'video/h264')
self.send_header("Content-length", file_size)
self.end_headers(),
self.wfile.write(f.read())
f.close()
deleteFile(path_to_video)
if self.path=="/pic":
path_to_image = '/home/pi/image.png'
# delete file if exists
deleteFile(path_to_image)
camera.resolution = (1920, 1080)
camera.capture(path_to_image)
#Open the static file requested and send it
f = open(path_to_image, 'rb')
statinfo = os.stat(path_to_image)
img_size = statinfo.st_size
self.send_response(200)
self.send_header('Content-type', 'image/png')
self.send_header("Content-length", img_size)
self.end_headers(),
self.wfile.write(f.read())
f.close()
deleteFile(path_to_image)
else:
self.do_AUTHHEAD()
response = {
'success': False,
'error': 'Invalid credentials'
}
self.wfile.write(bytes(json.dumps(response), 'utf-8'))
class CustomHTTPServer(HTTPServer):
key = ''
def set_auth(self, username, password):
self.key = base64.b64encode(
bytes('%s:%s' % (username, password), 'utf-8')).decode('ascii')
def get_auth_key(self):
return self.key
myServer = CustomHTTPServer((hostName, hostPort), MyServer)
print(time.asctime(), "Server Starts - %s:%s" % (hostName, hostPort))
camera = picamera.PiCamera()
# Turn the camera's LED off
camera.led = False
try:
myServer.set_auth('admin', 'admin')
myServer.serve_forever()
except KeyboardInterrupt:
pass
myServer.server_close()
camera.close()
print(time.asctime(), "Server Stops - %s:%s" % (hostName, hostPort))