-
Notifications
You must be signed in to change notification settings - Fork 207
Expand file tree
/
Copy pathtransport.go
More file actions
282 lines (233 loc) · 6.85 KB
/
transport.go
File metadata and controls
282 lines (233 loc) · 6.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
// SPDX-FileCopyrightText: 2026 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT
package ice
import (
"context"
"net"
"sync/atomic"
"time"
"github.com/pion/stun/v3"
)
// AwaitConnect waits until a pair is selected.
func (a *Agent) AwaitConnect(ctx context.Context) error {
select {
case <-a.loop.Done():
return a.loop.Err()
case <-ctx.Done():
return ErrCanceledByCaller
case <-a.onConnected:
}
return nil
}
// StartDial sets the agent up for connecting to the remote agent, acting as the
// controlling agent and returns immediately.
func (a *Agent) StartDial(remoteUfrag, remotePwd string) (*Conn, error) {
conn, err := a.startConnect(true, remoteUfrag, remotePwd)
if err != nil {
return nil, err
}
return conn, nil
}
// Dial blocks until at least one ice candidate pair has successfully connected.
func (a *Agent) Dial(ctx context.Context, remoteUfrag, remotePwd string) (*Conn, error) {
conn, err := a.StartDial(remoteUfrag, remotePwd) //nolint:contextcheck
if err != nil {
return nil, err
}
err = a.AwaitConnect(ctx)
if err != nil {
return nil, err
}
return conn, nil
}
// StartAccept sets the agent up for connecting to the remote agent, acting as the
// controlled agent and returns immediately.
func (a *Agent) StartAccept(remoteUfrag, remotePwd string) (*Conn, error) {
conn, err := a.startConnect(false, remoteUfrag, remotePwd)
if err != nil {
return nil, err
}
return conn, nil
}
// Accept blocks until at least one ice candidate pair has successfully connected.
func (a *Agent) Accept(ctx context.Context, remoteUfrag, remotePwd string) (*Conn, error) {
conn, err := a.StartAccept(remoteUfrag, remotePwd) //nolint:contextcheck
if err != nil {
return nil, err
}
err = a.AwaitConnect(ctx)
if err != nil {
return nil, err
}
return conn, nil
}
// Conn represents the ICE connection.
// At the moment the lifetime of the Conn is equal to the Agent.
type Conn struct {
bytesReceived atomic.Uint64
bytesSent atomic.Uint64
agent *Agent
}
// BytesSent returns the number of bytes sent.
func (c *Conn) BytesSent() uint64 {
return c.bytesSent.Load()
}
// BytesReceived returns the number of bytes received.
func (c *Conn) BytesReceived() uint64 {
return c.bytesReceived.Load()
}
func (a *Agent) startConnect(isControlling bool, remoteUfrag, remotePwd string) (*Conn, error) {
err := a.loop.Err()
if err != nil {
return nil, err
}
err = a.startConnectivityChecks(isControlling, remoteUfrag, remotePwd) //nolint:contextcheck
if err != nil {
return nil, err
}
return &Conn{
agent: a,
}, nil
}
// Read implements the Conn Read method.
func (c *Conn) Read(p []byte) (int, error) {
err := c.agent.loop.Err()
if err != nil {
return 0, err
}
n, err := c.agent.buf.Read(p)
c.bytesReceived.Add(uint64(n)) //nolint:gosec // G115
return n, err
}
// Write implements the Conn Write method.
func (c *Conn) Write(packet []byte) (int, error) {
err := c.agent.loop.Err()
if err != nil {
return 0, err
}
if stun.IsMessage(packet) {
return 0, errWriteSTUNMessageToIceConn
}
pair := c.agent.getSelectedPair()
if pair == nil {
if err = c.agent.loop.Run(c.agent.loop, func(_ context.Context) {
pair = c.agent.getBestValidCandidatePair()
}); err != nil {
return 0, err
}
if pair == nil {
return 0, err
}
}
// Write application data via the selected pair and update stats with actual bytes written.
n, err := pair.Write(packet)
if n > 0 {
c.bytesSent.Add(uint64(n))
pair.UpdatePacketSent(n)
}
return n, err
}
// GetCandidatePairsInfo returns snapshot information for all candidate pairs.
// Use the returned ID with WriteToPair() to write to a specific pair.
func (c *Conn) GetCandidatePairsInfo() []CandidatePairInfo {
var pairs []CandidatePairInfo
err := c.agent.loop.Run(c.agent.loop, func(_ context.Context) {
pairs = make([]CandidatePairInfo, 0, len(c.agent.checklist))
for _, cp := range c.agent.checklist {
pairs = append(pairs, CandidatePairInfo{
ID: cp.id,
LocalCandidateType: cp.Local.Type(),
RemoteCandidateType: cp.Remote.Type(),
State: cp.state,
Nominated: cp.nominated,
CurrentRoundTripTime: time.Duration(atomic.LoadInt64(&cp.currentRoundTripTime)),
RenominationQuality: c.agent.evaluateCandidatePairQuality(cp),
})
}
})
if err != nil {
return nil
}
return pairs
}
// WriteToPair writes packet to a specific candidate pair identified by its ID.
// Returns ErrCandidatePairNotFound if the pair ID is not found.
// Returns ErrCandidatePairNotSucceeded if the pair is not in Succeeded state.
// This is useful for sending packets over alternate paths
// even if they are not nominated.
func (c *Conn) WriteToPair(pairID uint64, packet []byte) (int, error) {
if err := c.agent.loop.Err(); err != nil {
return 0, err
}
if stun.IsMessage(packet) {
return 0, errWriteSTUNMessageToIceConn
}
var pair *CandidatePair
var lookupErr error
if err := c.agent.loop.Run(c.agent.loop, func(_ context.Context) {
pair = c.agent.pairsByID[pairID]
if pair == nil {
lookupErr = ErrCandidatePairNotFound
return
}
if pair.state != CandidatePairStateSucceeded {
lookupErr = ErrCandidatePairNotSucceeded
}
}); err != nil {
return 0, err
}
if lookupErr != nil {
return 0, lookupErr
}
n, err := pair.Write(packet)
if n > 0 {
pair.UpdatePacketSent(n)
}
return n, err
}
// Close implements the Conn Close method. It is used to close
// the connection. Any calls to Read and Write will be unblocked and return an error.
func (c *Conn) Close() error {
return c.agent.Close()
}
// LocalAddr returns the local address of the current selected pair or nil if there is none.
func (c *Conn) LocalAddr() net.Addr {
pair := c.agent.getSelectedPair()
if pair == nil {
return nil
}
return pair.Local.addr()
}
// RemoteAddr returns the remote address of the current selected pair or nil if there is none.
func (c *Conn) RemoteAddr() net.Addr {
pair := c.agent.getSelectedPair()
if pair == nil {
return nil
}
return pair.Remote.addr()
}
// SetDeadline sets both read and write deadlines on the underlying ICE connection.
func (c *Conn) SetDeadline(t time.Time) error {
if err := c.SetReadDeadline(t); err != nil {
return err
}
return c.SetWriteDeadline(t)
}
// SetReadDeadline sets the read deadline on the packet buffer used for application data.
func (c *Conn) SetReadDeadline(t time.Time) error {
return c.agent.buf.SetReadDeadline(t)
}
// SetWriteDeadline sets the write deadline on the currently selected local candidate connection.
// The deadline applies to the selected candidate pair and will affect all traffic over that pair.
func (c *Conn) SetWriteDeadline(t time.Time) error {
pair := c.agent.getSelectedPair()
if pair == nil || pair.Local == nil {
return nil
}
if d, ok := pair.Local.(interface {
setWriteDeadline(time.Time) error
}); ok {
return d.setWriteDeadline(t)
}
return nil
}