diff --git a/buf/atomic_buf.go b/buf/atomic_buf.go new file mode 100644 index 0000000..a3db7e6 --- /dev/null +++ b/buf/atomic_buf.go @@ -0,0 +1,143 @@ +package buf + +import ( + "bytes" + "sync" + "io" +) + +type AtomicBuffers interface { + Bytes() []byte + Cap() int + Grow(int) + Len() int + Next(int) []byte + ReadByte() (byte, error) + ReadBytes(byte) ([]byte, error) + ReadFrom(io.Reader) (int64, error) + ReadRune() (rune, int, error) + ReadString(byte) (string, error) + Reset() + Truncate(int) + UnreadByte() error + UnreadRune() error + WriteByte(byte) error + WriteRune(rune) (int, error) + WriteString(string) (int, error) + WriteTo(io.Writer) (int64, error) + Read([]byte) (int, error) + Write([]byte) (int, error) + String() string +} + +type atomicBuffer struct { + b bytes.Buffer + m sync.RWMutex +} + +func (b *atomicBuffer) Bytes() []byte { + b.m.RLock() + defer b.m.RUnlock() + return b.b.Bytes() +} +func (b *atomicBuffer) Cap() int { + b.m.RLock() + defer b.m.RUnlock() + return b.b.Cap() +} +func (b *atomicBuffer) Grow(n int) { + b.m.Lock() + defer b.m.Unlock() + b.b.Grow(n) +} +func (b *atomicBuffer) Len() int { + b.m.RLock() + defer b.m.RUnlock() + return b.b.Len() +} +func (b *atomicBuffer) Next(n int) []byte { + b.m.Lock() + defer b.m.Unlock() + return b.b.Next(n) +} +func (b *atomicBuffer) ReadByte() (c byte, err error) { + b.m.Lock() + defer b.m.Unlock() + return b.b.ReadByte() +} +func (b *atomicBuffer) ReadBytes(delim byte) (line []byte, err error) { + b.m.Lock() + defer b.m.Unlock() + return b.b.ReadBytes(delim) +} +func (b *atomicBuffer) ReadFrom(r io.Reader) (n int64, err error) { + b.m.Lock() + defer b.m.Unlock() + return b.b.ReadFrom(r) +} +func (b *atomicBuffer) ReadRune() (r rune, size int, err error) { + b.m.Lock() + defer b.m.Unlock() + return b.b.ReadRune() +} +func (b *atomicBuffer) ReadString(delim byte) (line string, err error) { + b.m.Lock() + defer b.m.Unlock() + return b.b.ReadString(delim) +} +func (b *atomicBuffer) Reset() { + b.m.Lock() + defer b.m.Unlock() + b.b.Reset() +} +func (b *atomicBuffer) Truncate(n int) { + b.m.Lock() + defer b.m.Unlock() + b.b.Truncate(n) +} +func (b *atomicBuffer) UnreadByte() error { + b.m.Lock() + defer b.m.Unlock() + return b.b.UnreadByte() +} +func (b *atomicBuffer) UnreadRune() error { + b.m.Lock() + defer b.m.Unlock() + return b.b.UnreadRune() +} +func (b *atomicBuffer) WriteByte(c byte) error { + b.m.Lock() + defer b.m.Unlock() + return b.b.WriteByte(c) +} +func (b *atomicBuffer) WriteRune(r rune) (n int, err error) { + b.m.Lock() + defer b.m.Unlock() + return b.b.WriteRune(r) +} +func (b *atomicBuffer) WriteString(s string) (n int, err error) { + b.m.Lock() + defer b.m.Unlock() + return b.b.WriteString(s) +} +func (b *atomicBuffer) WriteTo(w io.Writer) (n int64, err error) { + b.m.Lock() + defer b.m.Unlock() + return b.b.WriteTo(w) +} + +func (b *atomicBuffer) Read(p []byte) (n int, err error) { + b.m.Lock() + defer b.m.Unlock() + return b.b.Read(p) +} +func (b *atomicBuffer) Write(p []byte) (n int, err error) { + b.m.Lock() + defer b.m.Unlock() + return b.b.Write(p) +} +func (b *atomicBuffer) String() string { + b.m.RLock() + defer b.m.RUnlock() + return b.b.String() +} diff --git a/buf/atomic_buf_pool.go b/buf/atomic_buf_pool.go new file mode 100644 index 0000000..79fb3d0 --- /dev/null +++ b/buf/atomic_buf_pool.go @@ -0,0 +1,28 @@ +package buf + +import ( + "sync" +) + +// atomicBuffer pool to reduce GC +var atoBuffers = sync.Pool{ + // New is called when a new instance is needed + New: func() interface{} { + return new(atomicBuffer) + }, +} + +// GetBuffer fetches a atomicBuffer from the pool +func GetAtoBuffer() AtomicBuffers { + return atoBuffers.Get().(AtomicBuffers) +} + +// PutBuffer returns a atomicBuffer to the pool +func PutAtoBuffer(buf AtomicBuffers) { + bufPtr, ok := buf.(*atomicBuffer) + if !ok { + return + } + bufPtr.Reset() + atoBuffers.Put(bufPtr) +} diff --git a/buf/bytebuf.go b/buf/bytebuf.go new file mode 100644 index 0000000..5202b59 --- /dev/null +++ b/buf/bytebuf.go @@ -0,0 +1,74 @@ +package buf + +import ( + "unicode/utf8" + "unsafe" +) + +// ByteBuffer provides byte buffer, which can be used with fasthttp API +// in order to minimize memory allocations. +// +// ByteBuffer may be used with functions appending data to the given []byte +// slice. See example code for details. +// +// Use AcquireByteBuffer for obtaining an empty byte buffer. +type ByteBuffer struct { + // B is a byte buffer to use in append-like workloads. + // See example code for details. + B []byte +} + + +func (b *ByteBuffer) WriteByte(c byte) error { + b.B = append(b.B, c) + return nil +} + +// Write implements io.Writer - it appends p to ByteBuffer.B +func (b *ByteBuffer) Write(p []byte) (int, error) { + b.B = append(b.B, p...) + return len(p), nil +} + +// WriteString appends s to ByteBuffer.B +func (b *ByteBuffer) WriteString(s string) (int, error) { + b.B = append(b.B, s...) + return len(s), nil +} + +//(r rune) (n int, err error) { +// WriteString appends s to ByteBuffer.B +func (b *ByteBuffer) WriteRune(r rune) (n int, err error) { + if r < utf8.RuneSelf { + b.B = append(b.B, byte(r)) + return 1, nil + } + curSize := len(b.B) + runCharBuf := b.B[curSize:curSize+utf8.UTFMax] + n = utf8.EncodeRune(runCharBuf, r) + b.B = b.B[:curSize+n] + return n, nil +} + +// Set sets ByteBuffer.B to p +func (b *ByteBuffer) Set(p []byte) { + b.B = append(b.B[:0], p...) +} + +// SetString sets ByteBuffer.B to s +func (b *ByteBuffer) SetString(s string) { + b.B = append(b.B[:0], s...) +} + +// Reset makes ByteBuffer.B empty. +func (b *ByteBuffer) Reset() { + b.B = b.B[:0] +} + +// String returns the accumulated string. +func (b *ByteBuffer) String() string { + return *(*string)(unsafe.Pointer(&b.B)) +} + +// Len returns the number of accumulated bytes; b.Len() == len(b.String()). +func (b *ByteBuffer) Len() int { return len(b.B) } \ No newline at end of file diff --git a/buf/bytebuf_pool.go b/buf/bytebuf_pool.go index 2a2a0b4..467163a 100644 --- a/buf/bytebuf_pool.go +++ b/buf/bytebuf_pool.go @@ -2,72 +2,12 @@ package buf import ( "sync" - "unicode/utf8" ) const ( defaultByteBufferSize = 4096 ) -// ByteBuffer provides byte buffer, which can be used with fasthttp API -// in order to minimize memory allocations. -// -// ByteBuffer may be used with functions appending data to the given []byte -// slice. See example code for details. -// -// Use AcquireByteBuffer for obtaining an empty byte buffer. -type ByteBuffer struct { - // B is a byte buffer to use in append-like workloads. - // See example code for details. - B []byte -} - -func (b *ByteBuffer) WriteByte(c byte) error { - b.B = append(b.B, c) - return nil -} - -// Write implements io.Writer - it appends p to ByteBuffer.B -func (b *ByteBuffer) Write(p []byte) (int, error) { - b.B = append(b.B, p...) - return len(p), nil -} - -// WriteString appends s to ByteBuffer.B -func (b *ByteBuffer) WriteString(s string) (int, error) { - b.B = append(b.B, s...) - return len(s), nil -} - -//(r rune) (n int, err error) { -// WriteString appends s to ByteBuffer.B -func (b *ByteBuffer) WriteRune(r rune) (n int, err error) { - if r < utf8.RuneSelf { - b.B = append(b.B, byte(r)) - return 1, nil - } - curSize := len(b.B) - runCharBuf := b.B[curSize:curSize+utf8.UTFMax] - n = utf8.EncodeRune(runCharBuf, r) - b.B = b.B[:curSize+n] - return n, nil -} - -// Set sets ByteBuffer.B to p -func (b *ByteBuffer) Set(p []byte) { - b.B = append(b.B[:0], p...) -} - -// SetString sets ByteBuffer.B to s -func (b *ByteBuffer) SetString(s string) { - b.B = append(b.B[:0], s...) -} - -// Reset makes ByteBuffer.B empty. -func (b *ByteBuffer) Reset() { - b.B = b.B[:0] -} - type ByteBufferPool struct { pool sync.Pool } diff --git a/buf/bytes_queue.go b/buf/bytes_queue.go index 1cd557e..0dd5cf8 100644 --- a/buf/bytes_queue.go +++ b/buf/bytes_queue.go @@ -4,6 +4,7 @@ import ( "container/list" ) +// 블럭되지 않는 큐체널 func NewBytesQueue() (chan<- []byte, <-chan []byte) { send := make(chan []byte, 1) receive := make(chan []byte, 1) @@ -13,18 +14,14 @@ func NewBytesQueue() (chan<- []byte, <-chan []byte) { func manageBytesQueue(send <-chan []byte, receive chan<- []byte) { queue := list.New() + defer close(receive) for { if front := queue.Front(); front == nil { - if send == nil { - close(receive) - return + if value, ok := <-send; ok { + queue.PushBack(value) + } else { + break } - value, ok := <-send - if !ok { - close(receive) - return - } - queue.PushBack(value) } else { select { case receive <- front.Value.([]byte): @@ -32,8 +29,6 @@ func manageBytesQueue(send <-chan []byte, receive chan<- []byte) { case value, ok := <-send: if ok { queue.PushBack(value) - } else { - send = nil } } } diff --git a/buf/string_queue.go b/buf/string_queue.go index 1f6366b..1be0f61 100644 --- a/buf/string_queue.go +++ b/buf/string_queue.go @@ -14,18 +14,14 @@ func NewStringQueue() (chan<- string, <-chan string) { func manageStringQueue(send <-chan string, receive chan<- string) { queue := list.New() + defer close(receive) for { if front := queue.Front(); front == nil { - if send == nil { - close(receive) - return + if value, ok := <-send; ok { + queue.PushBack(value) + } else { + break } - value, ok := <-send - if !ok { - close(receive) - return - } - queue.PushBack(value) } else { select { case receive <- front.Value.(string): @@ -33,8 +29,6 @@ func manageStringQueue(send <-chan string, receive chan<- string) { case value, ok := <-send: if ok { queue.PushBack(value) - } else { - send = nil } } }