Skip to content

Commit 22aa373

Browse files
authored
Add Span::prepare_for_async method and AbstractSpan trait. (#58)
1 parent 632766f commit 22aa373

File tree

10 files changed

+438
-102
lines changed

10 files changed

+438
-102
lines changed

README.md

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,48 @@ async fn main() -> Result<(), Box<dyn Error>> {
170170
}
171171
```
172172

173+
# Advanced APIs
174+
175+
## Async Span APIs
176+
177+
`Span::prepare_for_async` designed for async use cases.
178+
When tags, logs, and attributes (including end time) of the span need to be set in another
179+
thread or coroutine.
180+
181+
`TracingContext::wait` wait for all `AsyncSpan` finished.
182+
183+
```rust
184+
use skywalking::{
185+
trace::tracer::Tracer,
186+
trace::span::AbstractSpan,
187+
};
188+
189+
async fn handle(tracer: Tracer) {
190+
let mut ctx = tracer.create_trace_context();
191+
192+
{
193+
let span = ctx.create_entry_span("op1");
194+
195+
// Create AsyncSpan and drop span.
196+
// Internally, span will occupy the position of finalized span stack.
197+
let mut async_span = span.prepare_for_async();
198+
199+
// Start async route, catch async_span with `move` keyword.
200+
tokio::spawn(async move {
201+
202+
async_span.add_tag("foo", "bar");
203+
204+
// Something...
205+
206+
// async_span will drop here, submit modifications to finalized spans stack.
207+
});
208+
}
209+
210+
// Wait for all `AsyncSpan` finished.
211+
ctx.wait();
212+
}
213+
```
214+
173215
# How to compile?
174216

175217
If you have `skywalking-(VERSION).crate`, you can unpack it with the way as follows:

e2e/data/expected_context.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ segmentItems:
5050
peer: consumer:8082
5151
skipAnalysis: false
5252
spanId: 1
53-
spanLayer: Http
53+
spanLayer: Unknown
5454
spanType: Exit
5555
startTime: gt 0
5656
- componentId: 11000

e2e/docker/Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717
#
18-
FROM rust:1.63
19-
RUN apt-get update && apt-get install -y cmake protobuf-compiler=3.12.4-1
18+
FROM rust:1.65
19+
RUN apt-get update && apt-get install -y cmake protobuf-compiler
2020
WORKDIR /build
2121
COPY . /build/
2222
RUN cargo build --release --workspace

src/common/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,4 @@
1818
1919
pub mod random_generator;
2020
pub(crate) mod system_time;
21+
pub(crate) mod wait_group;

src/common/wait_group.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one or more
2+
// contributor license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright ownership.
4+
// The ASF licenses this file to You under the Apache License, Version 2.0
5+
// (the "License"); you may not use this file except in compliance with
6+
// the License. You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
17+
use std::sync::{Arc, Condvar, Mutex};
18+
19+
#[derive(Clone)]
20+
pub(crate) struct WaitGroup {
21+
inner: Arc<Inner>,
22+
}
23+
24+
struct Inner {
25+
var: Condvar,
26+
count: Mutex<usize>,
27+
}
28+
29+
impl Default for WaitGroup {
30+
fn default() -> Self {
31+
Self {
32+
inner: Arc::new(Inner {
33+
var: Condvar::new(),
34+
count: Mutex::new(0),
35+
}),
36+
}
37+
}
38+
}
39+
40+
impl WaitGroup {
41+
pub(crate) fn add(&self, n: usize) {
42+
*self.inner.count.lock().unwrap() += n;
43+
}
44+
45+
pub(crate) fn done(&self) {
46+
let mut count = self.inner.count.lock().unwrap();
47+
*count -= 1;
48+
if *count == 0 {
49+
self.inner.var.notify_all();
50+
}
51+
}
52+
53+
pub(crate) fn wait(self) {
54+
let mut count = self.inner.count.lock().unwrap();
55+
while *count > 0 {
56+
count = self.inner.var.wait(count).unwrap();
57+
}
58+
}
59+
}

src/logging/record.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@ use crate::{
2222
log_data_body::Content, JsonLog, KeyStringValuePair, LogData, LogDataBody, LogTags,
2323
TextLog, TraceContext, YamlLog,
2424
},
25-
trace::{span::Span, trace_context::TracingContext},
25+
trace::{
26+
span::{AbstractSpan, Span},
27+
trace_context::TracingContext,
28+
},
2629
};
2730
use std::time::{SystemTime, UNIX_EPOCH};
2831

src/trace/span.rs

Lines changed: 140 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,47 @@
1818
//! Span from Google Dapper Paper.
1919
2020
use crate::{
21-
common::system_time::{fetch_time, TimePeriod},
21+
common::{
22+
system_time::{fetch_time, TimePeriod},
23+
wait_group::WaitGroup,
24+
},
2225
proto::v3::{SpanLayer, SpanObject, SpanType},
23-
trace::trace_context::SpanStack,
26+
trace::trace_context::{SpanStack, SpanUid},
2427
};
25-
use std::{fmt::Formatter, mem::take, sync::Arc};
28+
use std::{
29+
fmt::{self, Formatter},
30+
mem::take,
31+
sync::{Arc, Weak},
32+
};
33+
34+
/// [AbstractSpan] contains methods handle [SpanObject].
35+
pub trait AbstractSpan {
36+
/// Get immutable span object reference.
37+
fn span_object(&self) -> &SpanObject;
38+
39+
/// Mutable with inner span object.
40+
fn span_object_mut(&mut self) -> &mut SpanObject;
41+
42+
/// Get span id.
43+
fn span_id(&self) -> i32 {
44+
self.span_object().span_id
45+
}
46+
47+
/// Add logs to the span.
48+
fn add_log<K, V, I>(&mut self, message: I)
49+
where
50+
K: Into<String>,
51+
V: Into<String>,
52+
I: IntoIterator<Item = (K, V)>,
53+
{
54+
self.span_object_mut().add_log(message)
55+
}
56+
57+
/// Add tag to the span.
58+
fn add_tag(&mut self, key: impl Into<String>, value: impl Into<String>) {
59+
self.span_object_mut().add_tag(key, value)
60+
}
61+
}
2662

2763
/// Span is a concept that represents trace information for a single RPC.
2864
/// The Rust SDK supports Entry Span to represent inbound to a service
@@ -61,13 +97,14 @@ use std::{fmt::Formatter, mem::take, sync::Arc};
6197
/// ```
6298
#[must_use = "assign a variable name to guard the span not be dropped immediately."]
6399
pub struct Span {
64-
index: usize,
100+
uid: SpanUid,
65101
obj: Option<SpanObject>,
102+
wg: WaitGroup,
66103
stack: Arc<SpanStack>,
67104
}
68105

69-
impl std::fmt::Debug for Span {
70-
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
106+
impl fmt::Debug for Span {
107+
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
71108
f.debug_struct("Span")
72109
.field(
73110
"data",
@@ -83,10 +120,11 @@ impl std::fmt::Debug for Span {
83120
const SKYWALKING_RUST_COMPONENT_ID: i32 = 11000;
84121

85122
impl Span {
86-
pub(crate) fn new(index: usize, obj: SpanObject, stack: Arc<SpanStack>) -> Self {
123+
pub(crate) fn new(uid: SpanUid, obj: SpanObject, wg: WaitGroup, stack: Arc<SpanStack>) -> Self {
87124
Self {
88-
index,
125+
uid,
89126
obj: Some(obj),
127+
wg,
90128
stack,
91129
}
92130
}
@@ -115,53 +153,121 @@ impl Span {
115153
}
116154
}
117155

118-
/// Get immutable span object reference.
156+
fn is_active_span(&self) -> bool {
157+
let active_spans = &*self.stack.active();
158+
active_spans
159+
.last()
160+
.map(|span| span.uid() == self.uid)
161+
.unwrap_or_default()
162+
}
163+
164+
/// The [Span] finish at current tracing context, but the current span is
165+
/// still alive, until [AsyncSpan] dropped.
166+
///
167+
/// This method must be called:
168+
///
169+
/// 1. In original thread (tracing context).
170+
/// 2. Current span is active span.
171+
///
172+
/// During alive, tags, logs and attributes of the span could be changed, in
173+
/// any thread.
174+
///
175+
/// # Panics
176+
///
177+
/// Current span could by active span.
178+
pub fn prepare_for_async(mut self) -> AsyncSpan {
179+
if !self.is_active_span() {
180+
panic!("current span isn't active span");
181+
}
182+
183+
self.wg.add(1);
184+
185+
AsyncSpan {
186+
uid: self.uid,
187+
wg: self.wg.clone(),
188+
obj: take(&mut self.obj),
189+
stack: Arc::downgrade(&self.stack),
190+
}
191+
}
192+
}
193+
194+
impl Drop for Span {
195+
/// Set the end time as current time, pop from context active span stack,
196+
/// and push to context spans.
197+
fn drop(&mut self) {
198+
self.stack.finalize_span(self.uid, take(&mut self.obj));
199+
}
200+
}
201+
202+
impl AbstractSpan for Span {
119203
#[inline]
120-
pub fn span_object(&self) -> &SpanObject {
204+
fn span_object(&self) -> &SpanObject {
121205
self.obj.as_ref().unwrap()
122206
}
123207

124-
/// Mutable with inner span object.
125208
#[inline]
126-
pub fn span_object_mut(&mut self) -> &mut SpanObject {
209+
fn span_object_mut(&mut self) -> &mut SpanObject {
127210
self.obj.as_mut().unwrap()
128211
}
212+
}
129213

130-
/// Get span id.
131-
pub fn span_id(&self) -> i32 {
132-
self.span_object().span_id
133-
}
134-
135-
/// Add logs to the span.
136-
pub fn add_log<K, V, I>(&mut self, message: I)
137-
where
138-
K: Into<String>,
139-
V: Into<String>,
140-
I: IntoIterator<Item = (K, V)>,
141-
{
142-
self.span_object_mut().add_log(message)
143-
}
214+
/// Generated by [Span::prepare_for_async], tags, logs and attributes of the
215+
/// span could be changed, in any thread.
216+
///
217+
/// It could be finished when dropped.
218+
#[must_use = "assign a variable name to guard the active span not be dropped immediately."]
219+
pub struct AsyncSpan {
220+
uid: SpanUid,
221+
obj: Option<SpanObject>,
222+
wg: WaitGroup,
223+
stack: Weak<SpanStack>,
224+
}
144225

145-
/// Add tag to the span.
146-
pub fn add_tag(&mut self, key: impl Into<String>, value: impl Into<String>) {
147-
self.span_object_mut().add_tag(key, value)
226+
impl fmt::Debug for AsyncSpan {
227+
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
228+
f.debug_struct("AsyncSpan")
229+
.field(
230+
"data",
231+
match self.obj {
232+
Some(ref obj) => obj,
233+
None => &"<none>",
234+
},
235+
)
236+
.finish()
148237
}
149238
}
150239

151-
impl Drop for Span {
152-
/// Set the end time as current time, pop from context active span stack,
153-
/// and push to context spans.
240+
impl Drop for AsyncSpan {
241+
/// Set the end time as current time.
154242
fn drop(&mut self) {
155243
self.stack
156-
.finalize_span(self.index, take(&mut self.obj).unwrap());
244+
.upgrade()
245+
.expect("TracingContext has dropped")
246+
.finalize_async_span(self.uid, take(&mut self.obj).unwrap());
247+
248+
self.wg.done();
249+
}
250+
}
251+
252+
impl AbstractSpan for AsyncSpan {
253+
#[inline]
254+
fn span_object(&self) -> &SpanObject {
255+
self.obj.as_ref().unwrap()
256+
}
257+
258+
#[inline]
259+
fn span_object_mut(&mut self) -> &mut SpanObject {
260+
self.obj.as_mut().unwrap()
157261
}
158262
}
159263

160264
#[cfg(test)]
161265
mod tests {
162266
use super::*;
163267

164-
trait AssertSend: Send {}
268+
trait AssertSend: Send + 'static {}
165269

166270
impl AssertSend for Span {}
271+
272+
impl AssertSend for AsyncSpan {}
167273
}

0 commit comments

Comments
 (0)