Skip to content

Commit 60a802d

Browse files
authored
enhance: [2.6] Show create time for import job (#45059)
issue: #45056 pr: #45058 --------- Signed-off-by: bigsheeper <[email protected]>
1 parent 71d75a1 commit 60a802d

File tree

10 files changed

+928
-813
lines changed

10 files changed

+928
-813
lines changed

internal/datacoord/import_job.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ type ImportJob interface {
118118
GetState() internalpb.ImportJobState
119119
GetReason() string
120120
GetRequestedDiskSize() int64
121-
GetStartTime() string
121+
GetCreateTime() string
122122
GetCompleteTime() string
123123
GetFiles() []*internalpb.ImportFile
124124
GetOptions() []*commonpb.KeyValuePair

internal/datacoord/services.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1898,7 +1898,7 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
18981898
if jobID == 0 {
18991899
jobID = idStart
19001900
}
1901-
startTime := time.Now()
1901+
createTime := time.Now()
19021902
job := &importJob{
19031903
ImportJob: &datapb.ImportJob{
19041904
JobID: jobID,
@@ -1912,7 +1912,7 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
19121912
State: internalpb.ImportJobState_Pending,
19131913
Files: files,
19141914
Options: in.GetOptions(),
1915-
StartTime: startTime.Format("2006-01-02T15:04:05Z07:00"),
1915+
CreateTime: createTime.Format("2006-01-02T15:04:05Z07:00"),
19161916
ReadyVchannels: in.GetChannelNames(),
19171917
DataTs: in.GetDataTimestamp(),
19181918
},
@@ -1961,7 +1961,7 @@ func (s *Server) GetImportProgress(ctx context.Context, in *internalpb.GetImport
19611961
resp.Reason = reason
19621962
resp.Progress = progress
19631963
resp.CollectionName = job.GetCollectionName()
1964-
resp.StartTime = job.GetStartTime()
1964+
resp.CreateTime = job.GetCreateTime()
19651965
resp.CompleteTime = job.GetCompleteTime()
19661966
resp.ImportedRows = importedRows
19671967
resp.TotalRows = totalRows

internal/distributed/proxy/httpserver/handler_v2.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2737,6 +2737,7 @@ func (h *HandlersV2) getImportJobProcess(ctx context.Context, c *gin.Context, an
27372737
response := resp.(*internalpb.GetImportProgressResponse)
27382738
returnData := make(map[string]interface{})
27392739
returnData["jobId"] = jobIDGetter.GetJobID()
2740+
returnData["createTime"] = response.GetCreateTime()
27402741
returnData["collectionName"] = response.GetCollectionName()
27412742
returnData["completeTime"] = response.GetCompleteTime()
27422743
returnData["state"] = response.GetState().String()

internal/proxy/impl.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4602,7 +4602,7 @@ func convertToV1GetImportResponse(rsp *internalpb.GetImportProgressResponse) *mi
46024602
Value: strconv.FormatInt(rsp.GetProgress(), 10),
46034603
})
46044604
var createTs int64
4605-
createTime, err := time.Parse("2006-01-02T15:04:05Z07:00", rsp.GetStartTime())
4605+
createTime, err := time.Parse("2006-01-02T15:04:05Z07:00", rsp.GetCreateTime())
46064606
if err == nil {
46074607
createTs = createTime.Unix()
46084608
}

internal/util/importutilv2/common/util.go

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ package common
1818

1919
import (
2020
"fmt"
21+
"strings"
22+
"unicode/utf8"
2123

2224
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
2325
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
@@ -55,9 +57,41 @@ func EstimateReadCountPerBatch(bufferSize int, schema *schemapb.CollectionSchema
5557
return ret, nil
5658
}
5759

60+
// SafeStringForError safely converts a string for use in error messages.
61+
// It replaces invalid UTF-8 sequences with their hex representation to avoid
62+
// gRPC serialization errors while still providing useful debugging information.
63+
func SafeStringForError(s string) string {
64+
if utf8.ValidString(s) {
65+
return s
66+
}
67+
68+
var result strings.Builder
69+
for i, r := range s {
70+
if r == utf8.RuneError {
71+
// Invalid UTF-8 sequence, encode as hex
72+
result.WriteString(fmt.Sprintf("\\x%02x", s[i]))
73+
} else {
74+
result.WriteRune(r)
75+
}
76+
}
77+
return result.String()
78+
}
79+
80+
// SafeStringForErrorWithLimit safely converts a string for use in error messages
81+
// with a length limit to prevent extremely long error messages.
82+
func SafeStringForErrorWithLimit(s string, maxLen int) string {
83+
safe := SafeStringForError(s)
84+
if len(safe) <= maxLen {
85+
return safe
86+
}
87+
return safe[:maxLen] + "..."
88+
}
89+
5890
func CheckValidUTF8(s string, field *schemapb.FieldSchema) error {
5991
if !typeutil.IsUTF8(s) {
60-
return fmt.Errorf("field %s contains invalid UTF-8 data, value=%s", field.GetName(), s)
92+
// Use safe string representation to avoid gRPC serialization errors
93+
safeValue := SafeStringForErrorWithLimit(s, 100)
94+
return fmt.Errorf("field '%s' contains invalid UTF-8 data, value=%s", field.GetName(), safeValue)
6195
}
6296
return nil
6397
}

internal/util/importutilv2/common/util_test.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
package common
1818

1919
import (
20+
"strings"
2021
"testing"
22+
"unicode/utf8"
2123

2224
"github.com/stretchr/testify/assert"
2325

@@ -181,3 +183,81 @@ func TestUtil_CheckValidString(t *testing.T) {
181183
err = CheckValidString("aaaaa", 5, fieldSchema)
182184
assert.NoError(t, err)
183185
}
186+
187+
func TestUtil_SafeStringForError(t *testing.T) {
188+
// Test valid UTF-8 string
189+
validStr := "Hello, 世界!"
190+
result := SafeStringForError(validStr)
191+
assert.Equal(t, validStr, result)
192+
193+
// Test invalid UTF-8 string
194+
invalidStr := string([]byte{0xC0, 0xAF, 'a', 'b', 'c'})
195+
result = SafeStringForError(invalidStr)
196+
assert.Contains(t, result, "\\xc0")
197+
assert.Contains(t, result, "\\xaf")
198+
assert.Contains(t, result, "abc")
199+
200+
// Test empty string
201+
result = SafeStringForError("")
202+
assert.Equal(t, "", result)
203+
204+
// Test string with mixed valid and invalid UTF-8
205+
mixedStr := "valid" + string([]byte{0xFF, 0xFE}) + "text"
206+
result = SafeStringForError(mixedStr)
207+
assert.Contains(t, result, "valid")
208+
assert.Contains(t, result, "\\xff")
209+
assert.Contains(t, result, "\\xfe")
210+
assert.Contains(t, result, "text")
211+
}
212+
213+
func TestUtil_SafeStringForErrorWithLimit(t *testing.T) {
214+
// Test string within limit
215+
shortStr := "short"
216+
result := SafeStringForErrorWithLimit(shortStr, 10)
217+
assert.Equal(t, shortStr, result)
218+
219+
// Test string exceeding limit
220+
longStr := "this is a very long string that exceeds the limit"
221+
result = SafeStringForErrorWithLimit(longStr, 20)
222+
assert.Equal(t, 23, len(result)) // 20 chars + "..."
223+
assert.True(t, strings.HasSuffix(result, "..."))
224+
225+
// Test invalid UTF-8 string with limit
226+
invalidStr := string([]byte{0xC0, 0xAF, 0xFF, 0xFE, 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z'})
227+
result = SafeStringForErrorWithLimit(invalidStr, 15)
228+
assert.True(t, len(result) <= 18) // 15 chars + "..."
229+
assert.True(t, strings.HasSuffix(result, "..."))
230+
}
231+
232+
func TestUtil_CheckValidUTF8_WithSafeError(t *testing.T) {
233+
fieldSchema := &schemapb.FieldSchema{
234+
FieldID: 1,
235+
Name: "test_field",
236+
DataType: schemapb.DataType_VarChar,
237+
TypeParams: []*commonpb.KeyValuePair{
238+
{
239+
Key: common.MaxLengthKey,
240+
Value: "1000",
241+
},
242+
},
243+
}
244+
245+
// Test with invalid UTF-8 - should not cause gRPC serialization error
246+
invalidStr := string([]byte{0xC0, 0xAF, 0xFF, 0xFE})
247+
err := CheckValidUTF8(invalidStr, fieldSchema)
248+
assert.Error(t, err)
249+
250+
// Verify the error message contains safe representation
251+
errMsg := err.Error()
252+
assert.Contains(t, errMsg, "test_field")
253+
assert.Contains(t, errMsg, "invalid UTF-8 data")
254+
assert.Contains(t, errMsg, "\\xc0") // Should contain hex representation
255+
assert.Contains(t, errMsg, "\\xaf")
256+
257+
// Verify the error message is valid UTF-8 itself
258+
assert.True(t, utf8.ValidString(errMsg), "Error message should be valid UTF-8")
259+
260+
// Test with valid UTF-8
261+
err = CheckValidUTF8("valid string", fieldSchema)
262+
assert.NoError(t, err)
263+
}

pkg/proto/data_coord.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -987,7 +987,7 @@ message ImportJob {
987987
string complete_time = 13;
988988
repeated internal.ImportFile files = 14;
989989
repeated common.KeyValuePair options = 15;
990-
string start_time = 16;
990+
string create_time = 16;
991991
repeated string ready_vchannels = 17;
992992
uint64 data_ts = 18;
993993
}

0 commit comments

Comments
 (0)