-
Notifications
You must be signed in to change notification settings - Fork 207
Expand file tree
/
Copy pathshared_packet_conn.go
More file actions
132 lines (108 loc) · 2.97 KB
/
shared_packet_conn.go
File metadata and controls
132 lines (108 loc) · 2.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// SPDX-FileCopyrightText: 2026 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT
package ice
import (
"context"
"errors"
"io"
"net"
"os"
"sync"
"sync/atomic"
"time"
)
type muxedPacketConn interface {
net.PacketConn
readFromContext(ctx context.Context, b []byte) (int, net.Addr, error)
}
// sharedPacketConn is a reference-counted wrapper around an underlying
// muxedPacketConn owned by a mux. The mux can hand out several wrappers that
// share one underlying connection (for example, alias host candidates
// produced by an AddressRewrite append rule).
//
// Each wrapper owns its own context. Close() cancels that context, which
// unblocks any in-flight ReadFrom on this wrapper — without affecting
// siblings that still hold their own references. The underlying connection
// itself is closed only when the last wrapper is released.
type sharedPacketConn struct {
underlying muxedPacketConn
refs *atomic.Int32
ctx context.Context //nolint:containedctx
cancel context.CancelFunc
closeOnce sync.Once
readDeadline atomic.Pointer[time.Time]
}
// newSharedPacketConn increments the shared refcount and returns a wrapper.
// Each returned wrapper must have Close called exactly once.
func newSharedPacketConn(u muxedPacketConn, refs *atomic.Int32) *sharedPacketConn {
refs.Add(1)
ctx, cancel := context.WithCancel(context.Background())
return &sharedPacketConn{
underlying: u,
refs: refs,
ctx: ctx,
cancel: cancel,
}
}
func (s *sharedPacketConn) ReadFrom(b []byte) (int, net.Addr, error) {
ctx := s.ctx
if ctx.Err() != nil {
return 0, nil, io.ErrClosedPipe
}
if p := s.readDeadline.Load(); p != nil && !p.IsZero() {
var cancel context.CancelFunc
ctx, cancel = context.WithDeadline(s.ctx, *p)
defer cancel()
}
n, addr, err := s.underlying.readFromContext(ctx, b)
if errors.Is(err, context.DeadlineExceeded) {
return n, addr, os.ErrDeadlineExceeded
}
if errors.Is(err, context.Canceled) {
return n, addr, io.ErrClosedPipe
}
return n, addr, err
}
func (s *sharedPacketConn) WriteTo(b []byte, addr net.Addr) (int, error) {
if s.ctx.Err() != nil {
return 0, io.ErrClosedPipe
}
return s.underlying.WriteTo(b, addr)
}
func (s *sharedPacketConn) LocalAddr() net.Addr {
return s.underlying.LocalAddr()
}
func (s *sharedPacketConn) SetReadDeadline(t time.Time) error {
if s.ctx.Err() != nil {
return io.ErrClosedPipe
}
s.readDeadline.Store(&t)
return nil
}
func (s *sharedPacketConn) SetWriteDeadline(t time.Time) error {
if s.ctx.Err() != nil {
return io.ErrClosedPipe
}
return s.underlying.SetWriteDeadline(t)
}
func (s *sharedPacketConn) SetDeadline(t time.Time) error {
if err := s.SetReadDeadline(t); err != nil {
return err
}
return s.SetWriteDeadline(t)
}
func (s *sharedPacketConn) Close() error {
var err error
fired := false
s.closeOnce.Do(func() {
fired = true
s.cancel()
if s.refs.Add(-1) <= 0 {
err = s.underlying.Close()
}
})
if !fired {
return nil
}
return err
}