Skip to content
108 changes: 62 additions & 46 deletions src/main/java/com/qiniu/storage/FixBlockUploader.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import java.io.RandomAccessFile;
import java.nio.charset.Charset;
import java.util.*;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
Expand All @@ -30,13 +29,15 @@ public class FixBlockUploader {
private String host = null;

/**
* @param blockSize must be multiples of 4M.
* @param blockSize block size, eg: 4 * 1024 * 1024
* @param configuration Nullable, if null, then create a new one.
* @param client Nullable, if null, then create a new one with configuration.
* @param recorder Nullable.
*/
public FixBlockUploader(int blockSize, Configuration configuration, Client client, Recorder recorder) {
assert blockSize > 0 && blockSize % (4 * 1024 * 1024) == 0 : "blockSize must be multiples of 4M ";
if (blockSize <= 0) {
blockSize = 4 * 1024 * 1024;
}

if (configuration == null) {
configuration = new Configuration();
Expand Down Expand Up @@ -107,9 +108,9 @@ Response upload(BlockData blockData, Token token, String key, OptionsMeta params
/*
上传到七牛存储保存的文件名, 需要进行UrlSafeBase64编码。
注意:
当设置为空时表示空的文件名;
当设置为未进行 UrlSafeBase64 编码的字符 ~ 的时候,表示未设置文件名,
具体行为如分片上传v1: 使用文件的hash最为文件名, 如果设置了saveKey则使用saveKey的规则进行文件命名
当 key 为空 "" 时表示空的文件名,正常进行 url_safe_base64 编码;
当 key 为未进行 UrlSafeBase64 编码的字符 ~ 的时候表示未设置文件名,
具体行为如分片上传v1: 使用文件的 hash 作为文件名, 如果设置了saveKey则使用saveKey的规则进行文件命名
*/
String base64Key = key != null ? UrlSafeBase64.encodeToString(key) : "~";
String recordFileKey = (recorder == null) ? "NULL"
Expand All @@ -120,12 +121,15 @@ Response upload(BlockData blockData, Token token, String key, OptionsMeta params
host = configHelper.upHost(token.getUpToken());
}
UploadRecordHelper recordHelper = new UploadRecordHelper(recorder, recordFileKey, blockData.repeatable());
// 1. initParts
Record record = initUpload(blockData, recordHelper, bucket, base64Key, token);
boolean repeatable = recorder != null && blockData.repeatable();

Response res;
try {
// 2. uploadPart
upBlock(blockData, token, bucket, base64Key, repeatable, record, pool, maxRunningBlock);
// 3. completeParts
res = makeFile(bucket, base64Key, token, record.uploadId, record.etagIdxes,
blockData.getFileName(), params);
} catch (QiniuException e) {
Expand Down Expand Up @@ -154,14 +158,15 @@ record = null;
}

if (record == null || record.uploadId == null) {
String uploadId = init(bucket, base64Key, token.getUpToken());
InitRet ret = init(bucket, base64Key, token.getUpToken());

List<EtagIdx> etagIdxes = new ArrayList<>();
record = initRecord(uploadId, etagIdxes);
record = initRecord(ret, etagIdxes);
}
return record;
}

String init(String bucket, String base64Key, String upToken) throws QiniuException {
InitRet init(String bucket, String base64Key, String upToken) throws QiniuException {
String url = host + "/buckets/" + bucket + "/objects/" + base64Key + "/uploads";
byte[] data = new byte[0];
StringMap headers = new StringMap().put("Authorization", "UpToken " + upToken);
Expand Down Expand Up @@ -205,9 +210,9 @@ String init(String bucket, String base64Key, String upToken) throws QiniuExcepti
}

try {
String uploadId = res.jsonToMap().get("uploadId").toString();
if (uploadId.length() > 10) {
return uploadId;
InitRet ret = res.jsonToObject(InitRet.class);
if (ret != null && ret.uploadId != null && ret.uploadId.length() > 10 && ret.expireAt > 1000) {
return ret;
}
} catch (Exception e) {
// ignore, see next line
Expand All @@ -233,7 +238,7 @@ private boolean useParallel(ExecutorService pool, BlockData blockData, Record re
}

private void seqUpload(BlockData blockData, Token token, String bucket,
String base64Key, Record record) throws QiniuException {
String base64Key, Record record) throws QiniuException {
final String uploadId = record.uploadId;
final List<EtagIdx> etagIdxes = record.etagIdxes;
RetryCounter counter = new NormalRetryCounter(retryMax);
Expand Down Expand Up @@ -262,8 +267,8 @@ private void seqUpload(BlockData blockData, Token token, String bucket,
}

private void parallelUpload(BlockData blockData, final Token token,
final String bucket, final String base64Key, Record record,
boolean needRecord, ExecutorService pool, int maxRunningBlock) throws QiniuException {
final String bucket, final String base64Key, Record record,
boolean needRecord, ExecutorService pool, int maxRunningBlock) throws QiniuException {
final String uploadId = record.uploadId;
final List<EtagIdx> etagIdxes = record.etagIdxes;
final RetryCounter counter = new AsyncRetryCounter(retryMax);
Expand Down Expand Up @@ -329,11 +334,12 @@ private boolean alreadyDone(int index, List<EtagIdx> etagIdxes) {
}

private void waitingEnough(int maxRunningBlock, List<Future<EtagIdx>> futures) {
for (; ; ) {
while (true) {
if (futures.size() < maxRunningBlock) {
break;
}
int done = 0;
// max(len(futures)) = 10000
for (Future<EtagIdx> future : futures) {
if (future.isDone()) {
done++;
Expand All @@ -342,16 +348,24 @@ private void waitingEnough(int maxRunningBlock, List<Future<EtagIdx>> futures) {
if (futures.size() - done < maxRunningBlock) {
break;
}
sleepMillis(500);
sleepMillis(100);
}
}

EtagIdx uploadBlock(String bucket, String base64Key, Token token, String uploadId, byte[] data,
int dataLength, int partNum, RetryCounter counter) throws QiniuException {
Response res = uploadBlockWithRetry(bucket, base64Key, token, uploadId, data, dataLength, partNum, counter);
String url = host + "/buckets/" + bucket + "/objects/" + base64Key + "/uploads/" + uploadId + "/" + partNum;
String md5 = Md5.md5(data, 0, dataLength);
StringMap headers = new StringMap().
put("Content-MD5", md5).
put("Authorization", "UpToken " + token.getUpToken());

Response res = uploadBlockWithRetry(url, token, headers, data, dataLength, counter);
try {
String etag = res.jsonToMap().get("etag").toString();
if (etag.length() > 10) {
StringMap m = res.jsonToMap();
// String qMd5 = m.get("md5").toString();
String etag = m.get("etag").toString();
if (/*md5.equals(qMd5) && */etag.length() > 10) {
return new EtagIdx(etag, partNum, dataLength);
}
} catch (Exception e) {
Expand All @@ -360,14 +374,9 @@ EtagIdx uploadBlock(String bucket, String base64Key, Token token, String uploadI
throw new QiniuException(res);
}

Response uploadBlockWithRetry(String bucket, String base64Key, Token token, String uploadId,
byte[] data, int dataLength, int partNum, RetryCounter counter)
Response uploadBlockWithRetry(String url, Token token, StringMap headers,
byte[] data, int dataLength, RetryCounter counter)
throws QiniuException {
String url = host + "/buckets/" + bucket + "/objects/" + base64Key + "/uploads/" + uploadId + "/" + partNum;
StringMap headers = new StringMap().
put("Content-MD5", Md5.md5(data, 0, dataLength)).
put("Authorization", "UpToken " + token.getUpToken());

// 在 最多重试次数 范围内, 每个块至多上传 3 次 //
// 1
Response res = uploadBlock1(url, data, dataLength, headers, true);
Expand Down Expand Up @@ -432,7 +441,7 @@ Response makeFile(String bucket, String base64Key, Token token, String uploadId,
String fileName, OptionsMeta params) throws QiniuException {
String url = host + "/buckets/" + bucket + "/objects/" + base64Key + "/uploads/" + uploadId;
final StringMap headers = new StringMap().put("Authorization", "UpToken " + token.getUpToken());
sortAsc(etags);
sortByPartNumberAsc(etags);
byte[] data = new MakefileBody(etags, fileName, params)
.json().getBytes(Charset.forName("UTF-8"));

Expand Down Expand Up @@ -493,7 +502,7 @@ private String parseBucket(String upToken) throws QiniuException {
}
}

static void sortAsc(List<EtagIdx> etags) {
static void sortByPartNumberAsc(List<EtagIdx> etags) {
Collections.sort(etags, new Comparator<EtagIdx>() {
@Override
public int compare(EtagIdx o1, EtagIdx o2) {
Expand Down Expand Up @@ -546,9 +555,9 @@ public String json() {


class EtagIdx {
String etag;
int partNumber;
transient int size;
String etag; // mkfile
int partNumber; // mkfile
transient int size; // 本地使用,不写入 json 断点记录 //

EtagIdx(String etag, int idx, int size) {
this.etag = etag;
Expand All @@ -567,18 +576,20 @@ public String toString() {


class Record {
long createdTime;
// second
long expireAt;
String uploadId;
long size;
long size; // sum(current putParts size) EtagIdx.size 未序列化,历史断点记录没法获取到已上传大小 //
List<EtagIdx> etagIdxes;
}


Record initRecord(String uploadId, List<EtagIdx> etagIdxes) {
Record initRecord(InitRet ret, List<EtagIdx> etagIdxes) {
Record record = new Record();
record.createdTime = System.currentTimeMillis();
record.uploadId = uploadId;
record.size = 0;
record.uploadId = ret.uploadId;
//// 服务端 7 天内有效,设置 5 天 ////
record.expireAt = ret.expireAt - 3600 * 24 * 2;
record.size = 0; // 本次上传到需要写断点记录时上传的总大小,EtagIdx.size 未序列化,历史断点记录没法获取到已上传大小 //
record.etagIdxes = etagIdxes != null ? etagIdxes : new ArrayList<EtagIdx>();

return record;
Expand Down Expand Up @@ -620,22 +631,21 @@ public void delRecord() {

public void syncRecord(Record record) {
if (needRecord && recorder != null && record.etagIdxes.size() > 0) {
sortAsc(record.etagIdxes);
sortByPartNumberAsc(record.etagIdxes);
recorder.set(recordFileKey, new Gson().toJson(record).getBytes(Charset.forName("UTF-8")));
}
}

public boolean isActiveRecord(Record record, BlockData blockData) {
//// 服务端 7 天内有效,设置 5 天 ////
boolean isOk = record != null
&& record.createdTime > System.currentTimeMillis() - 1000 * 3600 * 24 * 5
&& record.expireAt < (System.currentTimeMillis() / 1000)
&& !StringUtils.isNullOrEmpty(record.uploadId)
&& record.etagIdxes != null && record.etagIdxes.size() > 0
&& record.size > 0 && record.size <= blockData.size();
if (isOk) {
int p = 0;
// PartNumber start with 1 and increase by 1 //
// 当前文件各块串行 if (ei.idx == p + 1) . 若并行,需额外考虑 //
// 并行上传,中间块可能缺失(上传失败) //
for (EtagIdx ei : record.etagIdxes) {
if (ei.partNumber > p) {
p = ei.partNumber;
Expand All @@ -652,6 +662,11 @@ public boolean isActiveRecord(Record record, BlockData blockData) {

///////////////////////////////////////

class InitRet {
String uploadId;
long expireAt;
}


abstract static class BlockData {
protected final int blockDataSize;
Expand Down Expand Up @@ -944,6 +959,7 @@ public String getFileName() {


static void sleepMillis(long millis) {
// LockSupport.parkNanos(millis * 1000 * 1000); // or
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Expand Down Expand Up @@ -975,8 +991,8 @@ public OptionsMeta setMimeType(String mimeType) {
}

/**
* @param key start with X-Qn-Meta-
* @param value not null or empty
* @param key start with X-Qn-Meta-
* @param value not null or empty
*/
public OptionsMeta addMetadata(String key, String value) {
if (metadata == null) {
Expand All @@ -987,8 +1003,8 @@ public OptionsMeta addMetadata(String key, String value) {
}

/**
* @param key start with x:
* @param value not null or empty
* @param key start with x:
* @param value not null or empty
*/
public OptionsMeta addCustomVar(String key, String value) {
if (customVars == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.junit.Test;
import test.com.qiniu.TempFile;
import test.com.qiniu.TestConfig;
import test.com.qiniu.util.EtagTest;

import java.io.File;
import java.io.IOException;
Expand All @@ -22,20 +23,31 @@
import static org.junit.Assert.assertEquals;

public class FixBlockUploaderWithRecorderTest {
int blockSize = 1024 * 1024 * 8;
int blockSize = 1024 * 1025;
Configuration config;
Client client;
FixBlockUploader up;
String bucket;
Recorder recorder;
BucketManager bm;

@Before
public void init() {
init2(false);
init2(true);
}

private void init2(boolean useHttpsDomains) {
config = new Configuration();
Region rhttps = new Region.Builder(Region.region0())
.accUpHost("up-dev.qiniu.io")
.srcUpHost("up-dev.qiniu.io")
.rsHost("rs-dev.qiniu.io")
.rsfHost("rsf-dev.qiniu.io").build();
Region rhttp = new Region.Builder(Region.region0())
.accUpHost("up.dev-kodo.dev.qiniu.io")
.srcUpHost("up.dev-kodo.dev.qiniu.io")
.rsHost("rs.dev-kodo.dev.qiniu.io")
.rsfHost("rsf.dev-kodo.dev.qiniu.io").build();
config = new Configuration(useHttpsDomains ? rhttps : rhttp);
config.useHttpsDomains = useHttpsDomains;
client = new Client(config);

Expand All @@ -47,7 +59,10 @@ private void init2(boolean useHttpsDomains) {
e.printStackTrace();
}
up = new FixBlockUploader(blockSize, config, client, recorder);
bm = new BucketManager(TestConfig.testAuth, config);
bucket = TestConfig.testBucket_as0;

bucket = "publicbucket_z0";
}

@Test
Expand All @@ -57,10 +72,10 @@ public void breakThenUpload1() throws IOException {

@Test
public void breakThenUpload2() throws IOException {
ExecutorService pool = new ThreadPoolExecutor(0, 2,
ExecutorService pool = new ThreadPoolExecutor(0, 10,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
breakThenUpload(pool, Executors.newFixedThreadPool(2), Executors.newCachedThreadPool(), 10, 10, 2);
breakThenUpload(pool, Executors.newFixedThreadPool(2), Executors.newCachedThreadPool(), 10, 10, 7);
}


Expand Down Expand Up @@ -91,12 +106,12 @@ public void breakThenUpload(ExecutorService pool1, ExecutorService pool2, Execut

public void breakThenUpload(final ExecutorService pool1, final ExecutorService pool2, final ExecutorService pool3,
int upSecondsTime1, int upSecondsTime2, final int maxRunningBlock) throws IOException {
final long size = 1024 * 53 + 1039;
final long size = 1024 * 83 + 1039;
final String expectKey = "\r\n?&r=" + size + "k" + System.currentTimeMillis();
final File f = TempFile.createFileOld(size);
final FixBlockUploader.FileBlockData fbd = new FixBlockUploader.FileBlockData(blockSize, f);
System.out.println(f.getAbsolutePath());
final String etag = Etag.file(f);
final String etag = EtagTest.etagV2(f, blockSize);
final String returnBody = "{\"key\":\"$(key)\",\"hash\":\"$(etag)\",\"fsize\":\"$(fsize)\""
+ ",\"fname\":\"$(fname)\",\"mimeType\":\"$(mimeType)\"}";

Expand All @@ -105,6 +120,12 @@ public void breakThenUpload(final ExecutorService pool1, final ExecutorService p
final String token = TestConfig.testAuth.uploadToken(bucket, expectKey, 3600, p);
final int[] t1Finished = {0};

try {
bm.delete(bucket, expectKey);
} catch (Exception e) {
// do nothing
}

Thread t1 = new Thread() {
@Override
public void run() {
Expand Down Expand Up @@ -230,6 +251,11 @@ public void run() {
throw e;
} finally {
TempFile.remove(f);
try {
bm.delete(bucket, expectKey);
} catch (Exception e) {
// do nothing
}
}
}

Expand Down
Loading