Skip to content

Commit

Permalink
api: /censuses/export now streams the response (dirty)
Browse files Browse the repository at this point in the history
  • Loading branch information
altergui committed Sep 16, 2024
1 parent 9fbb868 commit c318727
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 9 deletions.
1 change: 1 addition & 0 deletions api/censusdb/censusdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ func (c *CensusDB) ExportCensusDB(buffer io.Writer) error {
defer c.Unlock()
// Iterate through all census entries in the DB
err := c.db.Iterate([]byte(censusDBreferencePrefix), func(key, data []byte) bool {
log.Warnf("iterate %x (%s), %x (%s)", key, key, data, data)
censusID := bytes.Clone(key)
dec := gob.NewDecoder(bytes.NewReader(data))
ref := CensusRef{}
Expand Down
24 changes: 15 additions & 9 deletions api/censuses.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,27 +984,33 @@ func (a *API) censusListHandler(_ *apirest.APIdata, ctx *httprouter.HTTPContext)
// @Router /censuses/export/ipfs [get]
// @Router /censuses/export [get]
func (a *API) censusExportDBHandler(_ *apirest.APIdata, ctx *httprouter.HTTPContext) error {
isIPFSExport := strings.HasSuffix(ctx.Request.URL.Path, "ipfs")
buf := bytes.Buffer{}
if err := a.censusdb.ExportCensusDB(&buf); err != nil {
w, err := ctx.SendStream(apirest.HTTPstatusOK)
if err != nil {
return err
}
var data []byte
if isIPFSExport {
if strings.HasSuffix(ctx.Request.URL.Path, "ipfs") {
buf := bytes.Buffer{}
log.Warn("start export censusdb") // debug
if err := a.censusdb.ExportCensusDB(&buf); err != nil {
return err
}
log.Warn("end export censusdb") // debug
uri, err := a.storage.PublishReader(ctx.Request.Context(), &buf)
if err != nil {
return err
}
data, err = json.Marshal(map[string]string{
data, err := json.Marshal(map[string]string{
"uri": uri,
})
if err != nil {
return err
}
} else {
data = buf.Bytes()
_, err = w.Write(data)
return err
}
return ctx.Send(data, apirest.HTTPstatusOK)
log.Warn("start export censusdb") // debug
defer log.Warn("end export censusdb") // debug
return a.censusdb.ExportCensusDB(w)
}

// censusImportHandler
Expand Down
41 changes: 41 additions & 0 deletions httprouter/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package httprouter

import (
"fmt"
"io"
"net/http"
"time"
"unicode/utf8"
Expand Down Expand Up @@ -114,3 +115,43 @@ func (h *HTTPContext) Send(msg []byte, httpStatusCode int) error {
_, err := h.writer.Write([]byte("\n"))
return err
}

// SendStream replies the request with the status code and returns a io.Writer so the data can be written into it.
func (h *HTTPContext) SendStream(httpStatusCode int) (io.Writer, error) {
defer func() {
if r := recover(); r != nil {
log.Warnf("recovered http send panic: %v", r)
}
}()
defer close(h.sent)

if httpStatusCode < 100 || httpStatusCode >= 600 {
return nil, fmt.Errorf("http status code %d not supported", httpStatusCode)
}
if h.Request.Context().Err() != nil {
// The connection was closed, so don't try to write to it.
return nil, fmt.Errorf("connection is closed")
}
// Set the content type if not set to default application/json
if h.contentType == "" {
h.writer.Header().Set("Content-Type", DefaultContentType)
} else {
h.writer.Header().Set("Content-Type", h.contentType)
}

// Special handling for no content, reset content, and not modified.
if httpStatusCode == http.StatusNoContent ||
httpStatusCode == http.StatusResetContent ||
httpStatusCode == http.StatusNotModified ||
(httpStatusCode >= 100 && httpStatusCode < 200) {
// Don't set Content-Length, don't try to write a body.
h.writer.WriteHeader(httpStatusCode)
log.Debugw("http response", "status", httpStatusCode)
return nil, nil
}

h.writer.WriteHeader(httpStatusCode)

log.Debugw("http response will be streamed in chunks", "status", httpStatusCode)
return h.writer, nil
}

0 comments on commit c318727

Please sign in to comment.