-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathreplica_forwarding.go
145 lines (121 loc) · 3.86 KB
/
replica_forwarding.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package main
import "fmt"
import "io"
import "net"
import "net/http"
import "regexp"
import "os"
import "time"
var proxyTransport *http.Transport = &http.Transport{
// stop roundTrip() interfering with the default verm handling of compression
DisableCompression: true,
// remaining stuff as per http.DefaultTransport, though with our timeout:
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: ReplicaProxyTimeout * time.Second,
KeepAlive: ReplicaProxyTimeout * time.Second,
}).Dial,
TLSHandshakeTimeout: ReplicaProxyTimeout * time.Second,
}
var proxyClient = &http.Client{
Timeout: ReplicaProxyTimeout * time.Second,
Transport: proxyTransport,
}
var headerFieldsToReturn = []string{
"Content-Type",
"Content-Encoding",
"Content-Length",
"Content-Range",
"Last-Modified",
"ETag",
}
func copyHeaderField(src, dst http.Header, field string) {
value := src.Get(field)
if value != "" {
dst.Set(field, value)
}
}
func copyHeaderFields(src, dst http.Header, fields []string) {
for _, field := range fields {
copyHeaderField(src, dst, field)
}
}
var hashlikeExpression = regexp.MustCompile("/[A-Za-g][A-Za-z0-9_-]/[A-Za-g][A-Za-z0-9_-]{40}(\\.[A-Za-z0-9]+|$)")
func (server vermServer) shouldForwardRead(req *http.Request) bool {
param := req.URL.Query()["forward"]
if len(param) != 0 && param[len(param) - 1] == "0" {
return false
}
return hashlikeExpression.MatchString(req.URL.Path)
}
func (server vermServer) forwardRead(w http.ResponseWriter, req *http.Request) bool {
// we use a separate goroutine to grab the first successful response because we want it to hang
// around and close any slower connections so they can be reused (per http docs)
ch := make(chan *http.Response)
go server.Targets.forwardRequest(w, req, ch)
resp := <-ch
if resp == nil {
// all targets failed or didn't have the file
return false
}
// we got a successful response from at least one of the replicas, copy the winning response over
copyHeaderFields(resp.Header, w.Header(), headerFieldsToReturn)
w.WriteHeader(http.StatusOK)
io.Copy(w, resp.Body)
resp.Body.Close()
return true
}
func (targets *ReplicationTargets) forwardRequest(w http.ResponseWriter, req *http.Request, out chan *http.Response) {
responses := make(chan *http.Response)
for _, target := range targets.targets {
go target.forwardRequest(w, req, responses)
}
success := false
for _, _ = range targets.targets {
resp := <-responses
// resp will be nil if this target failed
if resp != nil {
if success {
// if we've already passed on another response, just clean up the connection so it can be reused, per http package docs
resp.Body.Close()
} else {
// this response won the race, pass it on
success = true
out <- resp
// keep iterating so we can close any other response bodies as above
}
}
}
if !success {
out <- nil
}
close(out)
close(responses)
}
func (target *ReplicationTarget) forwardRequest(w http.ResponseWriter, reqIn *http.Request, out chan *http.Response) {
path := fmt.Sprintf("http://%s:%s%s?forward=0", target.hostname, target.port, reqIn.URL.Path)
reqOut, err := http.NewRequest("GET", path, nil)
if err != nil {
fmt.Fprintf(os.Stderr, "Error setting up request for %s: %s\n", path, err.Error())
out <- nil
}
copyHeaderField(reqIn.Header, reqOut.Header, "Accept-Encoding")
resp, err := proxyClient.Do(reqOut)
if err != nil {
// unexpected failure
fmt.Fprintf(os.Stderr, "Error requesting %s: %s\n", path, err.Error())
out <- nil
} else if resp.StatusCode == http.StatusOK {
// success - pass on the response and let the other end close the response
out <- resp
} else if resp.StatusCode == http.StatusNotFound {
// normal missing case
resp.Body.Close()
out <- nil
} else {
// unexpected HTTP error
fmt.Fprintf(os.Stderr, "HTTP error requesting %s: %d\n", path, resp.StatusCode)
resp.Body.Close()
out <- nil
}
}