Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 155 additions & 1 deletion Aliyun/Log/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,159 @@ public function putLogs(Aliyun_Log_Models_PutLogsRequest $request) {
return new Aliyun_Log_Models_PutLogsResponse ( $header );
}

/**
* create shipper service
* @param Aliyun_Log_Models_CreateShipperRequest $request
* return Aliyun_Log_Models_CreateShipperResponse
*/
public function createShipper(Aliyun_Log_Models_CreateShipperRequest $request){
$headers = array();
$params = array();
$resource = "/logstores/".$request->getLogStore()."/shipper";
$project = $request->getProject () !== null ? $request->getProject () : '';
$headers["Content-Type"] = "application/json";

$body = array(
"shipperName" => $request->getShipperName(),
"targetType" => $request->getTargetType(),
"targetConfiguration" => $request->getTargetConfigration()
);
$body_str = json_encode($body);
$headers["x-log-bodyrawsize"] = strlen($body_str);
list($resp, $header) = $this->send("POST", $project,$body_str,$resource,$params,$headers);
$requestId = isset($header['x-log-requestid']) ? $header ['x-log-requestid'] : '';
$resp = $this->parseToJson($resp, $requestId);
return new Aliyun_Log_Models_CreateShipperResponse($resp, $header);
}

/**
* create shipper service
* @param Aliyun_Log_Models_CreateShipperRequest $request
* return Aliyun_Log_Models_CreateShipperResponse
*/
public function updateShipper(Aliyun_Log_Models_UpdateShipperRequest $request){
$headers = array();
$params = array();
$resource = "/logstores/".$request->getLogStore()."/shipper/".$request->getShipperName();
$project = $request->getProject () !== null ? $request->getProject () : '';
$headers["Content-Type"] = "application/json";

$body = array(
"shipperName" => $request->getShipperName(),
"targetType" => $request->getTargetType(),
"targetConfiguration" => $request->getTargetConfigration()
);
$body_str = json_encode($body);
$headers["x-log-bodyrawsize"] = strlen($body_str);
list($resp, $header) = $this->send("PUT", $project,$body_str,$resource,$params,$headers);
$requestId = isset($header['x-log-requestid']) ? $header ['x-log-requestid'] : '';
$resp = $this->parseToJson($resp, $requestId);
return new Aliyun_Log_Models_UpdateShipperResponse($resp, $header);
}

/**
* get shipper tasks list, max 48 hours duration supported
* @param Aliyun_Log_Models_CreateShipperRequest $request
* return Aliyun_Log_Models_CreateShipperResponse
*/
public function getShipperTasks(Aliyun_Log_Models_GetShipperTasksRequest $request){
$headers = array();
$params = array(
'from' => $request->getStartTime(),
'to' => $request->getEndTime(),
'status' => $request->getStatusType(),
'offset' => $request->getOffset(),
'size' => $request->getSize()
);
$resource = "/logstores/".$request->getLogStore()."/shipper/".$request->getShipperName()."/tasks";
$project = $request->getProject () !== null ? $request->getProject () : '';
$headers["x-log-bodyrawsize"] = 0;
$headers["Content-Type"] = "application/json";

list($resp, $header) = $this->send("GET", $project,null,$resource,$params,$headers);
$requestId = isset($header['x-log-requestid']) ? $header ['x-log-requestid'] : '';
$resp = $this->parseToJson($resp, $requestId);
return new Aliyun_Log_Models_GetShipperTasksResponse($resp, $header);
}

/**
* retry shipper tasks list by task ids
* @param Aliyun_Log_Models_CreateShipperRequest $request
* return Aliyun_Log_Models_CreateShipperResponse
*/
public function retryShipperTasks(Aliyun_Log_Models_RetryShipperTasksRequest $request){
$headers = array();
$params = array();
$resource = "/logstores/".$request->getLogStore()."/shipper/".$request->getShipperName()."/tasks";
$project = $request->getProject () !== null ? $request->getProject () : '';

$headers["Content-Type"] = "application/json";
$body = $request->getTaskLists();
$body_str = json_encode($body);
$headers["x-log-bodyrawsize"] = strlen($body_str);
list($resp, $header) = $this->send("PUT", $project,$body_str,$resource,$params,$headers);
$requestId = isset($header['x-log-requestid']) ? $header ['x-log-requestid'] : '';
$resp = $this->parseToJson($resp, $requestId);
return new Aliyun_Log_Models_RetryShipperTasksResponse($resp, $header);
}

/**
* delete shipper service
* @param Aliyun_Log_Models_DeleteShipperRequest $request
* return Aliyun_Log_Models_DeleteShipperResponse
*/
public function deleteShipper(Aliyun_Log_Models_DeleteShipperRequest $request){
$headers = array();
$params = array();
$resource = "/logstores/".$request->getLogStore()."/shipper/".$request->getShipperName();
$project = $request->getProject () !== null ? $request->getProject () : '';
$headers["x-log-bodyrawsize"] = 0;
$headers["Content-Type"] = "application/json";

list($resp, $header) = $this->send("DELETE", $project,null,$resource,$params,$headers);
$requestId = isset($header['x-log-requestid']) ? $header ['x-log-requestid'] : '';
$resp = $this->parseToJson($resp, $requestId);
return new Aliyun_Log_Models_DeleteShipperResponse($resp, $header);
}

/**
* get shipper config service
* @param Aliyun_Log_Models_DeleteShipperRequest $request
* return Aliyun_Log_Models_DeleteShipperResponse
*/
public function getShipperConfig(Aliyun_Log_Models_GetShipperConfigRequest $request){
$headers = array();
$params = array();
$resource = "/logstores/".$request->getLogStore()."/shipper/".$request->getShipperName();
$project = $request->getProject () !== null ? $request->getProject () : '';
$headers["x-log-bodyrawsize"] = 0;
$headers["Content-Type"] = "application/json";

list($resp, $header) = $this->send("GET", $project,null,$resource,$params,$headers);
$requestId = isset($header['x-log-requestid']) ? $header ['x-log-requestid'] : '';
$resp = $this->parseToJson($resp, $requestId);
return new Aliyun_Log_Models_GetShipperConfigResponse($resp, $header);
}

/**
* list shipper service
* @param Aliyun_Log_Models_DeleteShipperRequest $request
* return Aliyun_Log_Models_DeleteShipperResponse
*/
public function listShipper(Aliyun_Log_Models_ListShipperRequest $request){
$headers = array();
$params = array();
$resource = "/logstores/".$request->getLogStore()."/shipper";
$project = $request->getProject () !== null ? $request->getProject () : '';
$headers["x-log-bodyrawsize"] = 0;
$headers["Content-Type"] = "application/json";

list($resp, $header) = $this->send("GET", $project,null,$resource,$params,$headers);
$requestId = isset($header['x-log-requestid']) ? $header ['x-log-requestid'] : '';
$resp = $this->parseToJson($resp, $requestId);
return new Aliyun_Log_Models_ListShipperResponse($resp, $header);
}

/**
* create logstore
* Unsuccessful opertaion will cause an Aliyun_Log_Exception.
Expand All @@ -301,6 +454,7 @@ public function createLogstore(Aliyun_Log_Models_CreateLogstoreRequest $request)
"shardCount" => (int)($request -> getShardCount())
);
$body_str = json_encode($body);
$headers["x-log-bodyrawsize"] = strlen($body_str);
list($resp,$header) = $this -> send("POST",$project,$body_str,$resource,$params,$headers);
$requestId = isset ( $header ['x-log-requestid'] ) ? $header ['x-log-requestid'] : '';
$resp = $this->parseToJson ( $resp, $requestId );
Expand All @@ -318,7 +472,6 @@ public function updateLogstore(Aliyun_Log_Models_UpdateLogstoreRequest $request)
$headers = array ();
$params = array ();
$project = $request->getProject () !== null ? $request->getProject () : '';
$headers["x-log-bodyrawsize"] = 0;
$headers["Content-Type"] = "application/json";
$body = array(
"logstoreName" => $request -> getLogstore(),
Expand All @@ -327,6 +480,7 @@ public function updateLogstore(Aliyun_Log_Models_UpdateLogstoreRequest $request)
);
$resource = '/logstores/'.$request -> getLogstore();
$body_str = json_encode($body);
$headers["x-log-bodyrawsize"] = strlen($body_str);
list($resp,$header) = $this -> send("PUT",$project,$body_str,$resource,$params,$headers);
$requestId = isset ( $header ['x-log-requestid'] ) ? $header ['x-log-requestid'] : '';
$resp = $this->parseToJson ( $resp, $requestId );
Expand Down
67 changes: 67 additions & 0 deletions Aliyun/Log/Logger.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
<?php
/**
* Copyright (C) Alibaba Cloud Computing
* All rights reserved
*/

class Aliyun_Log_Logger{

protected $client;

protected $project;

protected $logstore;

public function __construct($client, $project, $logstore)
{
$this ->client = $client;
$this->logstore=$logstore;
$this->project=$project;
}

public function log($logLevel, $logMessage, $topic){
if(!Aliyun_Log_Models_logLevel_LogLevel::isValidValue($logLevel)){
throw new Exception('logLevel value is invalid!');
}
$ip = $this->getLocalIp();
$contents = array( // key-value pair
'time'=>date('m/d/Y h:i:s a', time()),
'message'=> $logMessage,
'loglevel'=> $logLevel
);
try {
$logItem = new Aliyun_Log_Models_LogItem();
$logItem->setTime(time());
$logItem->setContents($contents);
$logitems = array($logItem);
$request = new Aliyun_Log_Models_PutLogsRequest($this->project, $this->logstore,
$topic, $ip, $logitems);
$response = $this->client->putLogs($request);
} catch (Aliyun_Log_Exception $ex) {
var_dump($ex);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

所有的exception 都直接throw 出去,不要dump 下来

} catch (Exception $ex) {
var_dump($ex);
}
}

public function logBatch($logItems, $topic){
$ip = $this->getLocalIp();
try{
$request = new Aliyun_Log_Models_PutLogsRequest($this->project, $this->logstore,
$topic, $ip, $logItems);
$response = $this->client->putLogs($request);
} catch (Aliyun_Log_Exception $ex) {
var_dump($ex);
} catch (Exception $ex) {
var_dump($ex);
}
}

private function getLocalIp(){
$local_ip = getHostByName(php_uname('n'));
if(strlen($local_ip) == 0){
$local_ip = getHostByName(getHostName());
}
return $local_ip;
}
}
140 changes: 140 additions & 0 deletions Aliyun/Log/Models/LogBatch.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
<?php
/**
* Copyright (C) Alibaba Cloud Computing
* All rights reserved
*/

/**
* Class Aliyun_Log_Models_LogBatch
* in some cases the http port is quite limited, so user could config a batch logger,
* which will cache some log and send to server in bulk
*/
class Aliyun_Log_Models_LogBatch{

private $logItems = [];

private $arraySize;

private $logger;

private $sem_id;

private $shm_id;

private $topic;

private $waitTime;

private $previousLogTime;

/**
* Aliyun_Log_Models_LogBatch constructor.
* @param Aliyun_Log_Logger $logger
* @param $topic
* @param null $cacheLogCount max log items limitation, by default it's 100
* @param null $cacheLogWaitTime max thread waiting time, bydefault it's 5 seconds
*/
public function __construct(Aliyun_Log_Logger $logger, $topic, $cacheLogCount = null, $cacheLogWaitTime = null)
{
if(NULL === $cacheLogCount || !is_integer($cacheLogCount)){
$this->arraySize = 100;
}else{
$this->arraySize = $cacheLogCount;
}

if(NULL === $cacheLogWaitTime || !is_integer($cacheLogWaitTime)){
$this->waitTime = 5;
}else{
$this->waitTime = $cacheLogWaitTime;
}

$this->logger = $logger;
$this->topic = $topic;

$time_stampe = time();
$MEMSIZE = 5120;
$SEMKEY = $time_stampe;
$SHMKEY = $time_stampe+2233;

$this->sem_id = sem_get($SEMKEY, 10); // support 10 processes run simultaneously
$this->shm_id = shm_attach($SHMKEY, $MEMSIZE);
if(shm_has_var($this->shm_id, 1)){
shm_remove_var($this->shm_id, 1);
}
shm_put_var($this->shm_id, 1, $this->logItems);

}

/**
* log expected message with proper level
* @param $logMessage
* @param $logLevel
*/
public function log($logMessage, $logLevel){
Copy link
Collaborator

@suntingtao007 suntingtao007 Jan 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

对于log的话,单个message 有点太简单了,也可以直接同时写入多个key:value的模式 , 单个message 是一种特例。
可以提供一个log(level, array)的接口 , log(level, message) => log (leve , array("msg"=>message));

$previousCallTime = $this->previousLogTime;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为了易用,这个log 接口,可以再加一个bool的参数, use_buffer (默认为true),如果设置为false, 这立刻flush出去

if(NULL === $previousCallTime){
$previousCallTime = 0;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个时间的判断逻辑有问题,修改如下:

  1. previousCallTime = time() 初始的时候
  2. 每次调用一次log, 检查 time() - previousCallTime > xxx 是否满足
  3. 如果满足 send data, 并且更新previousCallTime = time()
  4. 如果第二不检查不满足, 则不send data, 也不更新 previousCallTime。

}
$this->previousLogTime = time();
if(is_array($logMessage)){
$logItemTemps = array();
foreach ($logMessage as &$logElement){
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

当这个作为一个array的时候, 是一条message, 而不是多条,一条message 可以有多个key,value。
传入的是array(其实是一个map)。

$contents = array( // key-value pair
'time'=>date('m/d/Y h:i:s a', time()),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这边的time 字段不需要了,下面有

'message'=> $logElement,
'loglevel'=> $logLevel
);
$logItem = new Aliyun_Log_Models_LogItem();
$logItem->setTime(time());
$logItem->setContents($contents);
array_push($logItemTemps, $logItem);


}
$this->logger->logBatch($logItemTemps, $this->topic);
}else{
$contents = array( // key-value pair
'time'=>date('m/d/Y h:i:s a', time()),
'message'=> $logMessage,
'loglevel'=> $logLevel
);
$logItem = new Aliyun_Log_Models_LogItem();
$logItem->setTime(time());
$logItem->setContents($contents);
if(shm_has_var($this->shm_id, 1)){
$logItems = shm_get_var($this->shm_id, 1);
array_push($logItems, $logItem);

if((sizeof($logItems) == $this->arraySize || $this->previousLogTime - $previousCallTime > 5000)
&& $previousCallTime > 0){
$this->logger->logBatch($logItems, $this->topic);
$logItems = [];
}

shm_remove_var($this->shm_id, 1);
shm_put_var($this->shm_id, 1, $logItems);
$this->logItems = $logItems;
}
}
}

/**
* manually flush all cached log to log server
*/
public function logFlush(){
if(sizeof($this->logItems) > 0){
$this->logger->logBatch($this->logItems, $this->topic);
$this->logItems = [];
}
shm_remove_var($this->shm_id, 1);
}

function __destruct() {
if(sizeof($this->logItems) > 0){
$this->logger->logBatch($this->logItems, $this->topic);
}

sem_remove($this->sem_id);
shm_remove($this->shm_id);
}
}
Loading