Request buffering middleware
This commit is contained in:
parent
d426126a92
commit
a81171d5f1
44 changed files with 2155 additions and 5 deletions
412
vendor/github.com/mailgun/multibuf/buffer.go
generated
vendored
Normal file
412
vendor/github.com/mailgun/multibuf/buffer.go
generated
vendored
Normal file
|
@ -0,0 +1,412 @@
|
|||
// package multibuf implements buffer optimized for streaming large chunks of data,
|
||||
// multiple reads and optional partial buffering to disk.
|
||||
package multibuf
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
)
|
||||
|
||||
// MultiReader provides Read, Close, Seek and Size methods. In addition to that it supports WriterTo interface
|
||||
// to provide efficient writing schemes, as functions like io.Copy use WriterTo when it's available.
|
||||
type MultiReader interface {
|
||||
io.Reader
|
||||
io.Seeker
|
||||
io.Closer
|
||||
io.WriterTo
|
||||
|
||||
// Size calculates and returns the total size of the reader and not the length remaining.
|
||||
Size() (int64, error)
|
||||
}
|
||||
|
||||
// WriterOnce implements write once, read many times writer. Create a WriterOnce and write to it, once Reader() function has been
|
||||
// called, the internal data is transferred to MultiReader and this instance of WriterOnce should be no longer used.
|
||||
type WriterOnce interface {
|
||||
// Write implements io.Writer
|
||||
Write(p []byte) (int, error)
|
||||
// Reader transfers all data written to this writer to MultiReader. If there was no data written it retuns an error
|
||||
Reader() (MultiReader, error)
|
||||
// WriterOnce owns the data before Reader has been called, so Close will close all the underlying files if Reader has not been called.
|
||||
Close() error
|
||||
}
|
||||
|
||||
// MaxBytes, ignored if set to value >=, if request exceeds the specified limit, the reader will return error,
|
||||
// by default buffer is not limited, negative values mean no limit
|
||||
func MaxBytes(m int64) optionSetter {
|
||||
return func(o *options) error {
|
||||
o.maxBytes = m
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// MemBytes specifies the largest buffer to hold in RAM before writing to disk, default is 1MB
|
||||
func MemBytes(m int64) optionSetter {
|
||||
return func(o *options) error {
|
||||
if m < 0 {
|
||||
return fmt.Errorf("MemBytes should be >= 0")
|
||||
}
|
||||
o.memBytes = m
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// NewWriterOnce returns io.ReadWrite compatible object that can limit the size of the buffer and persist large buffers to disk.
|
||||
// WriterOnce implements write once, read many times writer. Create a WriterOnce and write to it, once Reader() function has been
|
||||
// called, the internal data is transferred to MultiReader and this instance of WriterOnce should be no longer used.
|
||||
// By default NewWriterOnce returns unbound buffer that will allow to write up to 1MB in RAM and will start buffering to disk
|
||||
// It supports multiple functional optional arguments:
|
||||
//
|
||||
// // Buffer up to 1MB in RAM and limit max buffer size to 20MB
|
||||
// multibuf.NewWriterOnce(r, multibuf.MemBytes(1024 * 1024), multibuf.MaxBytes(1024 * 1024 * 20))
|
||||
//
|
||||
//
|
||||
func NewWriterOnce(setters ...optionSetter) (WriterOnce, error) {
|
||||
o := options{
|
||||
memBytes: DefaultMemBytes,
|
||||
maxBytes: DefaultMaxBytes,
|
||||
}
|
||||
if o.memBytes == 0 {
|
||||
o.memBytes = DefaultMemBytes
|
||||
}
|
||||
for _, s := range setters {
|
||||
if err := s(&o); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return &writerOnce{o: o}, nil
|
||||
}
|
||||
|
||||
// New returns MultiReader that can limit the size of the buffer and persist large buffers to disk.
|
||||
// By default New returns unbound buffer that will read up to 1MB in RAM and will start buffering to disk
|
||||
// It supports multiple functional optional arguments:
|
||||
//
|
||||
// // Buffer up to 1MB in RAM and limit max buffer size to 20MB
|
||||
// multibuf.New(r, multibuf.MemBytes(1024 * 1024), multibuf.MaxBytes(1024 * 1024 * 20))
|
||||
//
|
||||
//
|
||||
func New(input io.Reader, setters ...optionSetter) (MultiReader, error) {
|
||||
o := options{
|
||||
memBytes: DefaultMemBytes,
|
||||
maxBytes: DefaultMaxBytes,
|
||||
}
|
||||
|
||||
for _, s := range setters {
|
||||
if err := s(&o); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if o.memBytes == 0 {
|
||||
o.memBytes = DefaultMemBytes
|
||||
}
|
||||
if o.maxBytes > 0 && o.maxBytes < o.memBytes {
|
||||
o.memBytes = o.maxBytes
|
||||
}
|
||||
|
||||
memReader := &io.LimitedReader{
|
||||
R: input, // Read from this reader
|
||||
N: o.memBytes, // Maximum amount of data to read
|
||||
}
|
||||
readers := make([]io.ReadSeeker, 0, 2)
|
||||
|
||||
buffer, err := ioutil.ReadAll(memReader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
readers = append(readers, bytes.NewReader(buffer))
|
||||
|
||||
var file *os.File
|
||||
// This means that we have exceeded all the memory capacity and we will start buffering the body to disk.
|
||||
totalBytes := int64(len(buffer))
|
||||
if memReader.N <= 0 {
|
||||
file, err = ioutil.TempFile("", tempFilePrefix)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
os.Remove(file.Name())
|
||||
|
||||
readSrc := input
|
||||
if o.maxBytes > 0 {
|
||||
readSrc = &maxReader{R: input, Max: o.maxBytes - o.memBytes}
|
||||
}
|
||||
|
||||
writtenBytes, err := io.Copy(file, readSrc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
totalBytes += writtenBytes
|
||||
file.Seek(0, 0)
|
||||
readers = append(readers, file)
|
||||
}
|
||||
|
||||
var cleanupFn cleanupFunc
|
||||
if file != nil {
|
||||
cleanupFn = func() error {
|
||||
file.Close()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return newBuf(totalBytes, cleanupFn, readers...), nil
|
||||
}
|
||||
|
||||
// MaxSizeReachedError is returned when the maximum allowed buffer size is reached when reading
|
||||
type MaxSizeReachedError struct {
|
||||
MaxSize int64
|
||||
}
|
||||
|
||||
func (e *MaxSizeReachedError) Error() string {
|
||||
return fmt.Sprintf("Maximum size %d was reached", e)
|
||||
}
|
||||
|
||||
const (
|
||||
DefaultMemBytes = 1048576
|
||||
DefaultMaxBytes = -1
|
||||
// Equivalent of bytes.MinRead used in ioutil.ReadAll
|
||||
DefaultBufferBytes = 512
|
||||
)
|
||||
|
||||
// Constraints:
|
||||
// - Implements io.Reader
|
||||
// - Implements Seek(0, 0)
|
||||
// - Designed for Write once, Read many times.
|
||||
type multiReaderSeek struct {
|
||||
length int64
|
||||
readers []io.ReadSeeker
|
||||
mr io.Reader
|
||||
cleanup cleanupFunc
|
||||
}
|
||||
|
||||
type cleanupFunc func() error
|
||||
|
||||
func newBuf(length int64, cleanup cleanupFunc, readers ...io.ReadSeeker) *multiReaderSeek {
|
||||
converted := make([]io.Reader, len(readers))
|
||||
for i, r := range readers {
|
||||
// This conversion is safe as ReadSeeker includes Reader
|
||||
converted[i] = r.(io.Reader)
|
||||
}
|
||||
|
||||
return &multiReaderSeek{
|
||||
length: length,
|
||||
readers: readers,
|
||||
mr: io.MultiReader(converted...),
|
||||
cleanup: cleanup,
|
||||
}
|
||||
}
|
||||
|
||||
func (mr *multiReaderSeek) Close() (err error) {
|
||||
if mr.cleanup != nil {
|
||||
return mr.cleanup()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mr *multiReaderSeek) WriteTo(w io.Writer) (int64, error) {
|
||||
b := make([]byte, DefaultBufferBytes)
|
||||
var total int64
|
||||
for {
|
||||
n, err := mr.mr.Read(b)
|
||||
// Recommended way is to always handle non 0 reads despite the errors
|
||||
if n > 0 {
|
||||
nw, errw := w.Write(b[:n])
|
||||
total += int64(nw)
|
||||
// Write must return a non-nil error if it returns nw < n
|
||||
if nw != n || errw != nil {
|
||||
return total, errw
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
return total, nil
|
||||
}
|
||||
return total, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (mr *multiReaderSeek) Read(p []byte) (n int, err error) {
|
||||
return mr.mr.Read(p)
|
||||
}
|
||||
|
||||
func (mr *multiReaderSeek) Size() (int64, error) {
|
||||
return mr.length, nil
|
||||
}
|
||||
|
||||
func (mr *multiReaderSeek) Seek(offset int64, whence int) (int64, error) {
|
||||
// TODO: implement other whence
|
||||
// TODO: implement real offsets
|
||||
|
||||
if whence != 0 {
|
||||
return 0, fmt.Errorf("multiReaderSeek: unsupported whence")
|
||||
}
|
||||
|
||||
if offset != 0 {
|
||||
return 0, fmt.Errorf("multiReaderSeek: unsupported offset")
|
||||
}
|
||||
|
||||
for _, seeker := range mr.readers {
|
||||
seeker.Seek(0, 0)
|
||||
}
|
||||
|
||||
ior := make([]io.Reader, len(mr.readers))
|
||||
for i, arg := range mr.readers {
|
||||
ior[i] = arg.(io.Reader)
|
||||
}
|
||||
mr.mr = io.MultiReader(ior...)
|
||||
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
type options struct {
|
||||
// MemBufferBytes sets up the size of the memory buffer for this request.
|
||||
// If the data size exceeds the limit, the remaining request part will be saved on the file system.
|
||||
memBytes int64
|
||||
|
||||
maxBytes int64
|
||||
}
|
||||
|
||||
type optionSetter func(o *options) error
|
||||
|
||||
// MaxReader does not allow to read more than Max bytes and returns error if this limit has been exceeded.
|
||||
type maxReader struct {
|
||||
R io.Reader // underlying reader
|
||||
N int64 // bytes read
|
||||
Max int64 // max bytes to read
|
||||
}
|
||||
|
||||
func (r *maxReader) Read(p []byte) (int, error) {
|
||||
readBytes, err := r.R.Read(p)
|
||||
if err != nil && err != io.EOF {
|
||||
return readBytes, err
|
||||
}
|
||||
|
||||
r.N += int64(readBytes)
|
||||
if r.N > r.Max {
|
||||
return readBytes, &MaxSizeReachedError{MaxSize: r.Max}
|
||||
}
|
||||
return readBytes, err
|
||||
}
|
||||
|
||||
const (
|
||||
writerInit = iota
|
||||
writerMem
|
||||
writerFile
|
||||
writerCalledRead
|
||||
writerErr
|
||||
)
|
||||
|
||||
type writerOnce struct {
|
||||
o options
|
||||
err error
|
||||
state int
|
||||
mem *bytes.Buffer
|
||||
file *os.File
|
||||
total int64
|
||||
cleanupFn cleanupFunc
|
||||
}
|
||||
|
||||
// how many bytes we can still write to memory
|
||||
func (w *writerOnce) writeToMem(p []byte) int {
|
||||
left := w.o.memBytes - w.total
|
||||
if left <= 0 {
|
||||
return 0
|
||||
}
|
||||
bufLen := len(p)
|
||||
if int64(bufLen) < left {
|
||||
return bufLen
|
||||
}
|
||||
return int(left)
|
||||
}
|
||||
|
||||
func (w *writerOnce) Write(p []byte) (int, error) {
|
||||
out, err := w.write(p)
|
||||
return out, err
|
||||
}
|
||||
|
||||
func (w *writerOnce) Close() error {
|
||||
if w.file != nil {
|
||||
return w.file.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *writerOnce) write(p []byte) (int, error) {
|
||||
if w.o.maxBytes > 0 && int64(len(p))+w.total > w.o.maxBytes {
|
||||
return 0, fmt.Errorf("total size of %d exceeded allowed %d", int64(len(p))+w.total, w.o.maxBytes)
|
||||
}
|
||||
switch w.state {
|
||||
case writerCalledRead:
|
||||
return 0, fmt.Errorf("can not write after reader has been called")
|
||||
case writerInit:
|
||||
w.mem = &bytes.Buffer{}
|
||||
w.state = writerMem
|
||||
fallthrough
|
||||
case writerMem:
|
||||
writeToMem := w.writeToMem(p)
|
||||
if writeToMem > 0 {
|
||||
wrote, err := w.mem.Write(p[:writeToMem])
|
||||
w.total += int64(wrote)
|
||||
if err != nil {
|
||||
return wrote, err
|
||||
}
|
||||
}
|
||||
left := len(p) - writeToMem
|
||||
if left <= 0 {
|
||||
return len(p), nil
|
||||
}
|
||||
// we can't write to memory any more, switch to file
|
||||
if err := w.initFile(); err != nil {
|
||||
return int(writeToMem), err
|
||||
}
|
||||
w.state = writerFile
|
||||
wrote, err := w.file.Write(p[writeToMem:])
|
||||
w.total += int64(wrote)
|
||||
return len(p), err
|
||||
case writerFile:
|
||||
wrote, err := w.file.Write(p)
|
||||
w.total += int64(wrote)
|
||||
return wrote, err
|
||||
}
|
||||
return 0, fmt.Errorf("unsupported state: %d", w.state)
|
||||
}
|
||||
|
||||
func (w *writerOnce) initFile() error {
|
||||
file, err := ioutil.TempFile("", tempFilePrefix)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
w.file = file
|
||||
w.cleanupFn = func() error {
|
||||
file.Close()
|
||||
os.Remove(file.Name())
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *writerOnce) Reader() (MultiReader, error) {
|
||||
switch w.state {
|
||||
case writerInit:
|
||||
return nil, fmt.Errorf("no data ready")
|
||||
case writerCalledRead:
|
||||
return nil, fmt.Errorf("reader has been called")
|
||||
case writerMem:
|
||||
w.state = writerCalledRead
|
||||
return newBuf(w.total, nil, bytes.NewReader(w.mem.Bytes())), nil
|
||||
case writerFile:
|
||||
_, err := w.file.Seek(0, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// we are not responsible for file and buffer any more
|
||||
w.state = writerCalledRead
|
||||
br, fr := bytes.NewReader(w.mem.Bytes()), w.file
|
||||
w.file = nil
|
||||
w.mem = nil
|
||||
return newBuf(w.total, w.cleanupFn, br, fr), nil
|
||||
}
|
||||
return nil, fmt.Errorf("unsupported state: %d\n", w.state)
|
||||
}
|
||||
|
||||
const tempFilePrefix = "temp-multibuf-"
|
Loading…
Add table
Add a link
Reference in a new issue