PHP 处理返回大文件的接口

处理的主要逻辑点在 BasicAuth请求,文件流处理,行读取,CSV-Tab格式化,大数组分段批量保存,Text类型容量;

实际场景:数据中心提供的一份情报数据1.6w条记录,每条记录0.5k,字段10+;

  • 接口鉴权:
    basic Auth

  • 使用GuzzleHttp请求

    • option 设置 auth=>[name,password]
      • option 设置超时时间
  • 获取返回数据

    • $response = $client->get(…);
    • 获取返回数据的 stream $response->getBody();
    • 数据缓存路径 “ php:/temp “ (可自定义配置php.ini)
    • 判断文件大小,获取内容,写入指定路径 $tmpPath
    • 行读取 fgets() | SplFileObject
    • 格式化 Tab str_getcsv()
    • 返回数组
  • 大数组写入 MySQL

    • 批量写入 MySQL最大占位符限制 65535
    • SQLSTATE[HY000]: General error: 1390 Prepared statement contains too many placeholders.
    • MAX_NUM = int( 65535 / count( saveData[0] ) )
    • 分段写入 array_chunk( , MAX_NUM , ) 返回分段数组列表
    • 遍历写入 MySQL
    • 写入成功后 发送消息通知其他业务
    • 释放内存,删除临时文件
  • MySQL text 类型大小

    • ERROR: SQLSTATE[22001]: String data, right truncated: 1406 Data too long for column
      1
      2
      3
      4
      5
      TEXT 65,535 bytes ~64kb
      MEDIUMTEXT 16,777,215 bytes ~16Mb
      LONGTEXT 4,294,967,295 bytes ~4Gb

      ERROR: SQLSTATE[22001]: String data, right truncated: 1406 Data too long for column
  • 示例代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    namespace App\Console\Commands;

    use App\Jobs\QueueJob;
    use App\Models\SpecialName;
    use Exception;
    use GuzzleHttp\Client;
    use Illuminate\Console\Command;
    use Illuminate\Support\Facades\DB;


    /**
    * Class SyncNetlabBlock
    * @package App\Console\Commands
    */
    class SyncNetlabBlock extends Command
    {
    protected $signature = 'sync:netlab-blocklist';

    protected $description = 'xxxxxxx';

    /**
    * 获取数据
    * 查询匹配特定域名解析 special_name
    * 有则不添加
    * 无则新增
    * @throws Exception
    */
    public function handle()
    {
    $saveData = [];
    $queueData = [];
    $specialNameModel = new SpecialName();
    $netlabBlockList = $this->getNetlabData();
    $netlabBlockList = $this->distinctDomainName($netlabBlockList);
    // 构造新增数据
    foreach ($netlabBlockList as $qname => $value) {
    $item = [
    'qname' => $qname,
    ......
    ];
    $saveData[] = $item;
    $queueData[] = $specialNameModel->formatMessage($item);
    }
    //分段写入数据库
    if (count($saveData[0]) * count($saveData) > 65535) {
    print_r("开启分段写入Mysql:" . count($saveData) . PHP_EOL);
    $this->partitionSave($saveData, $queueData);
    return;
    }

    // 写入数据库
    if ($saveData && DB::table($specialNameModel->getTable())->insert($saveData)) {
    // 发送通知
    $specialNameModel->dispatch(new QueueJob(QueueJob::ADD_SPECIAL_NAME, $queueData));
    }
    // 释放内存
    unset($saveData, $queueData, $netlabBlockList);
    }


    /**
    * 分段 插入数据库-发送消息
    * @param $saveData
    * @param $queueData
    *
    * @throws Exception
    */
    public function partitionSave($saveData, $queueData)
    {
    $specialNameModel = new SpecialName();
    $groupSave = array_chunk($saveData, 5000, true);
    $groupQueue = array_chunk($queueData, 5000, true);
    for ($i = 0; $i < count($groupSave); $i++) {
    print_r("chunk_{$i}保存成功_COUNT()" . count($groupSave[$i]) . PHP_EOL);
    // 写入数据库
    if ($groupSave[$i] && DB::table($specialNameModel->getTable())->insert($groupSave[$i])) {
    // 发送通知
    $specialNameModel->dispatch(new QueueJob(QueueJob::ADD_SPECIAL_NAME, $groupQueue[$i]));
    }
    }
    }

    /**
    * 请求netlab接口获取数据
    * @return array
    */
    public function getNetlabData()
    {
    $netlabBlockList = $options = [];
    $httpClient = new Client();

    $path = "/tmp/netlab_blocklist_" . date("YmdHis") . ".txt";
    $url = "xxxxx";
    $user = 'xxxxx';
    $password = 'xxxxx';
    $options['auth'] = [$user, $password];
    $options['timeout'] = 7200;
    $response = $httpClient->get($url, $options);
    $data = $response->getBody();
    $wreturn = file_put_contents($path, $data->getContents());
    if ($wreturn) {
    $netlabBlockList = $this->readBlockListByLine($path);
    }

    //删除文件
    unlink($path);
    return $netlabBlockList;
    }

    /**
    * @param $netlabBlockList
    * @return array
    */
    public function distinctDomainName($netlabBlockList)
    {
    // 获取所有数据库数据-A记录域名-过滤已存在数据
    $queryData = SpecialName::query()->where('qtype', SpecialName::DEFAULT_TYPE_A)
    ->get('qname')->toArray();
    $queryData = array_column($queryData, 'qname');
    foreach ($queryData as $val) {
    if (isset($netlabBlockList[$val])) {
    unset($netlabBlockList[$val]);
    }
    }
    return $netlabBlockList;
    }

    /**
    * SplFileObject
    * 行读取-转换为数组
    * @param $fileUrl
    * @param bool $isTab
    * @return array
    */
    public function readBlockListByLine($fileUrl, $isTab = true)
    {

    file_exists($fileUrl) or exit("There is no file");

    $file = fopen($fileUrl, "r");
    $delimiter = $isTab ? "\t" : ",";

    $buffer = [];
    $i = 0;
    while (!feof($file)) {
    $line = fgets($file);//fgets()函数从文件指针中读取一行
    $data = str_getcsv($line, $delimiter);
    if (count($data) < 2) {
    continue;
    }
    $buffer[$data[1]] = $data[10];
    $i++;
    }
    fclose($file);
    $buffer = array_filter($buffer);
    return $buffer;
    }

    }