PHP 处理返回大文件的接口
处理的主要逻辑点在 BasicAuth请求,文件流处理,行读取,CSV-Tab格式化,大数组分段批量保存,Text类型容量;
实际场景:数据中心提供的一份情报数据1.6w条记录,每条记录0.5k,字段10+;
接口鉴权:
basic Auth使用GuzzleHttp请求
- option 设置 auth=>[name,password]
- option 设置超时时间
- option 设置 auth=>[name,password]
获取返回数据
- $response = $client->get(…);
- 获取返回数据的 stream $response->getBody();
- 数据缓存路径 “ php:/temp “ (可自定义配置php.ini)
- 判断文件大小,获取内容,写入指定路径 $tmpPath
- 行读取 fgets() | SplFileObject
- 格式化 Tab str_getcsv()
- 返回数组
大数组写入 MySQL
- 批量写入 MySQL最大占位符限制 65535
SQLSTATE[HY000]: General error: 1390 Prepared statement contains too many placeholders.
- MAX_NUM = int( 65535 / count( saveData[0] ) )
- 分段写入 array_chunk( , MAX_NUM , ) 返回分段数组列表
- 遍历写入 MySQL
- 写入成功后 发送消息通知其他业务
- 释放内存,删除临时文件
MySQL text 类型大小
ERROR: SQLSTATE[22001]: String data, right truncated: 1406 Data too long for column
1
2
3
4
5TEXT 65,535 bytes ~64kb
MEDIUMTEXT 16,777,215 bytes ~16Mb
LONGTEXT 4,294,967,295 bytes ~4Gb
ERROR: SQLSTATE[22001]: String data, right truncated: 1406 Data too long for column
示例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160namespace App\Console\Commands;
use App\Jobs\QueueJob;
use App\Models\SpecialName;
use Exception;
use GuzzleHttp\Client;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;
/**
* Class SyncNetlabBlock
* @package App\Console\Commands
*/
class SyncNetlabBlock extends Command
{
protected $signature = 'sync:netlab-blocklist';
protected $description = 'xxxxxxx';
/**
* 获取数据
* 查询匹配特定域名解析 special_name
* 有则不添加
* 无则新增
* @throws Exception
*/
public function handle()
{
$saveData = [];
$queueData = [];
$specialNameModel = new SpecialName();
$netlabBlockList = $this->getNetlabData();
$netlabBlockList = $this->distinctDomainName($netlabBlockList);
// 构造新增数据
foreach ($netlabBlockList as $qname => $value) {
$item = [
'qname' => $qname,
......
];
$saveData[] = $item;
$queueData[] = $specialNameModel->formatMessage($item);
}
//分段写入数据库
if (count($saveData[0]) * count($saveData) > 65535) {
print_r("开启分段写入Mysql:" . count($saveData) . PHP_EOL);
$this->partitionSave($saveData, $queueData);
return;
}
// 写入数据库
if ($saveData && DB::table($specialNameModel->getTable())->insert($saveData)) {
// 发送通知
$specialNameModel->dispatch(new QueueJob(QueueJob::ADD_SPECIAL_NAME, $queueData));
}
// 释放内存
unset($saveData, $queueData, $netlabBlockList);
}
/**
* 分段 插入数据库-发送消息
* @param $saveData
* @param $queueData
*
* @throws Exception
*/
public function partitionSave($saveData, $queueData)
{
$specialNameModel = new SpecialName();
$groupSave = array_chunk($saveData, 5000, true);
$groupQueue = array_chunk($queueData, 5000, true);
for ($i = 0; $i < count($groupSave); $i++) {
print_r("chunk_{$i}保存成功_COUNT()" . count($groupSave[$i]) . PHP_EOL);
// 写入数据库
if ($groupSave[$i] && DB::table($specialNameModel->getTable())->insert($groupSave[$i])) {
// 发送通知
$specialNameModel->dispatch(new QueueJob(QueueJob::ADD_SPECIAL_NAME, $groupQueue[$i]));
}
}
}
/**
* 请求netlab接口获取数据
* @return array
*/
public function getNetlabData()
{
$netlabBlockList = $options = [];
$httpClient = new Client();
$path = "/tmp/netlab_blocklist_" . date("YmdHis") . ".txt";
$url = "xxxxx";
$user = 'xxxxx';
$password = 'xxxxx';
$options['auth'] = [$user, $password];
$options['timeout'] = 7200;
$response = $httpClient->get($url, $options);
$data = $response->getBody();
$wreturn = file_put_contents($path, $data->getContents());
if ($wreturn) {
$netlabBlockList = $this->readBlockListByLine($path);
}
//删除文件
unlink($path);
return $netlabBlockList;
}
/**
* @param $netlabBlockList
* @return array
*/
public function distinctDomainName($netlabBlockList)
{
// 获取所有数据库数据-A记录域名-过滤已存在数据
$queryData = SpecialName::query()->where('qtype', SpecialName::DEFAULT_TYPE_A)
->get('qname')->toArray();
$queryData = array_column($queryData, 'qname');
foreach ($queryData as $val) {
if (isset($netlabBlockList[$val])) {
unset($netlabBlockList[$val]);
}
}
return $netlabBlockList;
}
/**
* SplFileObject
* 行读取-转换为数组
* @param $fileUrl
* @param bool $isTab
* @return array
*/
public function readBlockListByLine($fileUrl, $isTab = true)
{
file_exists($fileUrl) or exit("There is no file");
$file = fopen($fileUrl, "r");
$delimiter = $isTab ? "\t" : ",";
$buffer = [];
$i = 0;
while (!feof($file)) {
$line = fgets($file);//fgets()函数从文件指针中读取一行
$data = str_getcsv($line, $delimiter);
if (count($data) < 2) {
continue;
}
$buffer[$data[1]] = $data[10];
$i++;
}
fclose($file);
$buffer = array_filter($buffer);
return $buffer;
}
}