ChrisKim
Do not go gentle into that good night.
颢天

自建 PyPI / Conda 本地缓存或镜像源

Pip 和 Conda 是 Python 的两大软件包管理工具,它们的官方源在国内访问困难,下载速度非常慢。一般情况下我们使用的都是国内的镜像源,例如清华大学的 TUNA 镜像站、阿里云的镜像站。

但是有些软件包体积非常大,安装的时候从镜像站下载下来仍然需要等待很长时间,如果正巧遇到镜像站负载高峰导致下载速度缓慢,那更是雪上加霜。

为了防止配环境的时候软件包下载等待时间过长,一个可行的方法就是搭建一个本地镜像源,在下载的时候直接从本地镜像源下载,速度能够达到内网带宽。如果是千兆内网,那么理论可以达到 125MB/s,这个速度即使是好几个 GB 的软件包,也能在半分钟内装好。

1. 前言

首先我们需要知道缓存源和镜像源的区别:

  • 缓存源:初始状态为空。下载请求的软件包没有缓存,则回源到设置的上游镜像源,然后该软件包会被缓存。如果请求的软件包已经被缓存,则直接从本地缓存返回用户。
    • 下载速度:第一次速度 = 通过外网从上游镜像源下载的速度;之后的速度 = 内网带宽速度。
    • 磁盘空间:少。初始时只保存了软件包索引,随着使用过程,软件包被缓存,磁盘占用逐渐变大。
  • 镜像源:初始状态含有所有软件包,并且定时与上游镜像源同步。当下载请求到来时直接返回本地文件。
    • 下载速度:内网带宽速度,即使公网断了也能正常下载。
    • 磁盘空间:极大。完整的镜像都是 10TB+ 级别的,当然我们可以选择镜像一个子集。

通过上面的对比,可以发现这两种方案各有优劣。缓存源类似 CDN 缓存,镜像源则相当于全部复制。

这篇文章选择的方案是:

  • PyPI:使用 devpi (+ Nginx) 搭建缓存源
  • Conda:使用 Python + Nginx 搭建镜像源

PyPI 选择搭建缓存源的原因是 Pypi 的完整库体积过大(目前已经有 16TB,详见 https://pypi.org/stats/),全部镜像一遍成本过高,且平时根本用不到所有软件包,所以选择搭建缓存源。如果你恰好财力雄厚,也可以选择搭建镜像源,可使用 bandersnatch 进行同步(TUNA 就是用的这个程序).

而 Conda 选择搭建镜像源的原因是没有好用的缓存源程序,并且 Conda 的软件包数目比 Pypi 少很多,而且如果只下载 Windows 和 Linux 版本的软件包,搭建镜像源所需储存空间也能够接受(~800GB). 当然如果如果要搭建完整镜像,大小仍然是很夸张的 11TB.

2. 使用 devpi 搭建 PyPI 缓存源

快速配置

devpi 本身是一个 Python 软件包,可通过 pip 下载:pip install devpi

安装好后,首先需要初始化。使用 devpi-init --serverdir=[PATH] 进行初始化,其中 [PATH] 代表程序的工作目录,配置文件和缓存都储存在这个目录下,因此该目录的储存空间一定要充足。如果不指定这个参数,则工作目录默认在用户目录下:~/.devpi/server

初始化后,就可以启动服务器了。使用 devpi-server --host=[HOST] --port=[PORT] --serverdir=[PATH] 启动服务器,其中 [HOST] 为监听的地址,填写 127.0.0.1 则只有本机能访问,填写 0.0.0.0 则任何主机都能访问,[PORT] 为监听的端口,[PATH] 为刚才初始化选择的工作目录。

此时缓存源就已经正常运行了,不过 devpi 默认的上游镜像源是官方源,回源的时候会很慢,我们可以改成国内镜像。首先使用 devpi use http://[HOST]:[PORT]/root/pypi 选择我们刚才搭建的镜像源(此时不要关闭 devpi-server),然后使用 devpi login root 登陆 root 账号,默认密码为空直接回车即可。

然后选择使用以下命令切换上游镜像:

# 清华源
devpi index root/pypi "mirror_web_url_fmt=https://pypi.tuna.tsinghua.edu.cn/simple/{name}/" "mirror_url=https://pypi.tuna.tsinghua.edu.cn/simple/"
# 阿里源
devpi index root/pypi "mirror_web_url_fmt=https://mirrors.aliyun.com/pypi/simple/{name}/" "mirror_url=https://mirrors.aliyun.com/pypi/simple/"

配置 pip 下载源

如果是临时使用的话,在 pip 安装时指定 -i 参数填写下载源即可:

pip install -i [HOST]:[PORT] some-package

pip 需要手动信任非 https 的源,因此需要额外加 --trust-host 参数:

pip install -i http://[HOST]:[PORT]/root/pypi --trust-host [HOST]:[PORT] some-package

如果需要将其设为默认,则需要修改 pip 设置:

pip config set global.index-url http://[HOST]:[PORT]/root/pypi

pip 需要手动信任非 https 的源,因此需要额外修改:

pip config set global.trusted-host [HOST]:[PORT]

高级配置

上面的快速配置只是提供了基本的服务,如果你是安装在主力机上临时使用,这样就已经足够了。如果你想配置到服务器上永久使用,则需要一些高级配置。下面的操作均使用 Linux 系统完成。

首先生成配置文件,使用 devpi-gen-config --host=[HOST] --port=[PORT] --serverdir=[PATH],配置文件就会生成到当前目录的 gen-config 文件夹下。

我们本篇教程用到的是:

  • devpi.service:systemctl 服务配置文件
  • nginx-devpi.conf:Nginx 站点配置文件

首先使用 systemctl 实现服务自启,将配置文件拷贝到服务目录:cp gen-config/devpi.service /etc/systemd/system/

然后启用服务:systemctl enable devpi.service

然后启动服务:systemctl start devpi.service

查看服务状态:systemctl status devpi.service

如果显示绿色则说明服务正常启动了。

然后使用 Nginx 实现反向代理,首先要保证服务器装有 Nginx:apt install nginx

然后将配置文件拷贝到 Nginx 配置目录:cp gen-config/nginx-devpi.conf /etc/nginx/sites-available

然后链接到启动的网站目录:ln -s /etc/nginx/sites-available/nginx-devpi.conf /etc/nginx/sites-enabled/nginx-devpi.conf

最后重载配置文件:systemctl reload nginx

3. 使用 Python + Nginx 搭建 Conda 镜像源

镜像同步

同步上游镜像源使用的是 TUNA 提供的 Python 脚本,开源在 GitHub 上:https://github.com/tuna/tunasync-scripts/blob/master/anaconda.py

该脚本默认上游源为官方源,同步规模为完整同步 (11TB),我将这个脚本进行了调整,具体调整和对应代码行号如下:

  • (19~29) 上游源调整为 TUNA 清华源,加速同步
  • (34~36) 软件包只同步 Linux 64 位、Windows 64 位和通用三种系统架构,减小镜像体积。大家可以根据自己的设备情况进行调整。
  • (38~40, 235~236) Conda 安装包只同步 Linux 64 位、Windows 64 位两种,减小镜像体积。
  • (42~44) 删除全部额外 Conda 频道,减小镜像体积。如果大家需要用到 conda-forge 频道,可以把注释去掉。如果要用到其他频道,可以去 GitHub 原版脚本查找。
  • (69, 80) 哈希校验全部跳过,直接返回 True,加快同步过程。如果大家想要启动校验,则把这两行修改恢复即可。
  • (223~224) 每次同步都完整检查安装包。脚本默认设置的是每次同步 10% 的几率进行完整同步,若不是完整同步,则脚本如果发现最新版已经同步了之后就会直接跳过,不再检查旧版。

如果使用我这个脚本进行同步的话,镜像大小大约为 800GB. 大家可以根据自己的实际情况进行调整。不过有几点需要注意:

  1. 由于 TUNA 源的安装包列表格式和官方源不同,所以如果上游源指定为 TUNA 源时,安装包 (archive 和 miniconda) 无法正常同步。如果想要同步安装包,需要把上游源改回官方源。
  2. Python 元组不能只有一个元素,("main") 这种元组会被直接视为字符串,会导致脚本异常。如果你修改脚本时将一些元组删的只剩一个了,记得保留一个逗号,变成 ("main",) 这样脚本就能正常运行了。

修改版的脚本如下:

#!/usr/bin/env python3
import hashlib
import json
import logging
import os
import errno
import random
import shutil
import subprocess as sp
import tempfile
from email.utils import parsedate_to_datetime
from pathlib import Path

from pyquery import PyQuery as pq

import requests


DEFAULT_CONDA_REPO_BASE = "https://mirrors.tuna.tsinghua.edu.cn/anaconda"
DEFAULT_CONDA_CLOUD_BASE = "https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud"

CONDA_REPO_BASE_URL = os.getenv("CONDA_REPO_URL", "https://mirrors.tuna.tsinghua.edu.cn/anaconda")
CONDA_CLOUD_BASE_URL = os.getenv("CONDA_COULD_URL", "https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud")

# DEFAULT_CONDA_REPO_BASE = "https://repo.continuum.io"
# DEFAULT_CONDA_CLOUD_BASE = "https://conda.anaconda.org"

# CONDA_REPO_BASE_URL = os.getenv("CONDA_REPO_URL", "https://repo.continuum.io")
# CONDA_CLOUD_BASE_URL = os.getenv("CONDA_COULD_URL", "https://conda.anaconda.org")

WORKING_DIR = os.getenv("TUNASYNC_WORKING_DIR")

CONDA_REPOS = ("main", "free", "r", "msys2")
CONDA_ARCHES = (
    "noarch", "linux-64", "win-64"
)

CONDA_INSTALLER_ARCHES = (
    "Linux-x86_64.sh", "Windows-x86_64.exe"
)

CONDA_CLOUD_REPOS = (
    # "conda-forge/linux-64", "conda-forge/win-64", "conda-forge/noarch"
)

EXCLUDED_PACKAGES = (
    "pytorch-nightly", "pytorch-nightly-cpu", "ignite-nightly",
)

# connect and read timeout value
TIMEOUT_OPTION = (7, 10)

# Generate gzip archive for json files, size threshold
GEN_METADATA_JSON_GZIP_THRESHOLD = 1024 * 1024

logging.basicConfig(
    level=logging.INFO,
    format="[%(asctime)s] [%(levelname)s] %(message)s",
)

def sizeof_fmt(num, suffix='iB'):
    for unit in ['','K','M','G','T','P','E','Z']:
        if abs(num) < 1024.0:
            return "%3.2f%s%s" % (num, unit, suffix)
        num /= 1024.0
    return "%.2f%s%s" % (num, 'Y', suffix)

def md5_check(file: Path, md5: str = None):
    return True
    m = hashlib.md5()
    with file.open('rb') as f:
        while True:
            buf = f.read(1*1024*1024)
            if not buf:
                break
            m.update(buf)
    return m.hexdigest() == md5

def sha256_check(file: Path, sha256: str = None):
    return True
    m = hashlib.sha256()
    with file.open('rb') as f:
        while True:
            buf = f.read(1*1024*1024)
            if not buf:
                break
            m.update(buf)
    return m.hexdigest() == sha256


def curl_download(remote_url: str, dst_file: Path, sha256: str = None, md5: str = None):
    sp.check_call([
        "curl", "-o", str(dst_file),
        "-sL", "--remote-time", "--show-error",
        "--fail", "--retry", "10", "--speed-time", "15",
        "--speed-limit", "5000", remote_url,
    ])
    if sha256 and (not sha256_check(dst_file, sha256)):
        return "SHA256 mismatch"
    if md5 and (not md5_check(dst_file, md5)):
        return "MD5 mismatch"


def sync_repo(repo_url: str, local_dir: Path, tmpdir: Path, delete: bool):
    logging.info("Start syncing {}".format(repo_url))
    local_dir.mkdir(parents=True, exist_ok=True)

    repodata_url = repo_url + '/repodata.json'
    bz2_repodata_url = repo_url + '/repodata.json.bz2'
    # https://docs.conda.io/projects/conda-build/en/latest/release-notes.html
    # "current_repodata.json" - like repodata.json, but only has the newest version of each file
    current_repodata_url = repo_url + '/current_repodata.json'

    tmp_repodata = tmpdir / "repodata.json"
    tmp_bz2_repodata = tmpdir / "repodata.json.bz2"
    tmp_current_repodata = tmpdir / 'current_repodata.json'

    curl_download(repodata_url, tmp_repodata)
    curl_download(bz2_repodata_url, tmp_bz2_repodata)
    try:
        curl_download(current_repodata_url, tmp_current_repodata)
    except:
        pass

    with tmp_repodata.open() as f:
        repodata = json.load(f)

    remote_filelist = []
    total_size = 0
    packages = repodata['packages']
    if 'packages.conda' in repodata:
        packages.update(repodata['packages.conda'])
    for filename, meta in packages.items():
        if meta['name'] in EXCLUDED_PACKAGES:
            continue

        file_size = meta['size']
        # prefer sha256 over md5
        sha256 = None
        md5 = None
        if 'sha256' in meta:
            sha256 = meta['sha256']
        elif 'md5' in meta:
            md5 = meta['md5']
        total_size += file_size

        pkg_url = '/'.join([repo_url, filename])
        dst_file = local_dir / filename
        dst_file_wip = local_dir / ('.downloading.' + filename)
        remote_filelist.append(dst_file)

        if dst_file.is_file():
            stat = dst_file.stat()
            local_filesize = stat.st_size

            if file_size == local_filesize:
                logging.info("Skipping {}".format(filename))
                continue

            dst_file.unlink()

        for retry in range(3):
            logging.info("Downloading {}".format(filename))
            try:
                err = curl_download(pkg_url, dst_file_wip, sha256=sha256, md5=md5)
                if err is None:
                    dst_file_wip.rename(dst_file)
            except sp.CalledProcessError:
                err = 'CalledProcessError'
            if err is None:
                break
            logging.error("Failed to download {}: {}".format(filename, err))

    if os.path.getsize(tmp_repodata) > GEN_METADATA_JSON_GZIP_THRESHOLD:
        sp.check_call(["gzip", "--no-name", "--keep", "--", str(tmp_repodata)])
        shutil.move(str(tmp_repodata) + ".gz", str(local_dir / "repodata.json.gz"))
    else:
        # If the gzip file is not generated, remove the dangling gzip archive
        try:
            os.remove(str(local_dir / "repodata.json.gz"))
        except OSError as e:
            if e.errno != errno.ENOENT:
                raise

    shutil.move(str(tmp_repodata), str(local_dir / "repodata.json"))
    shutil.move(str(tmp_bz2_repodata), str(local_dir / "repodata.json.bz2"))
    tmp_current_repodata_gz_gened = False
    if tmp_current_repodata.is_file():
        if os.path.getsize(tmp_current_repodata) > GEN_METADATA_JSON_GZIP_THRESHOLD:
            sp.check_call(["gzip", "--no-name", "--keep", "--", str(tmp_current_repodata)])
            shutil.move(str(tmp_current_repodata) + ".gz", str(local_dir / "current_repodata.json.gz"))
            tmp_current_repodata_gz_gened = True
        shutil.move(str(tmp_current_repodata), str(
            local_dir / "current_repodata.json"))
    if not tmp_current_repodata_gz_gened:
        try:
            # If the gzip file is not generated, remove the dangling gzip archive
            os.remove(str(local_dir / "current_repodata.json.gz"))
        except OSError as e:
            if e.errno != errno.ENOENT:
                raise

    if delete:
        local_filelist = []
        delete_count = 0
        for i in local_dir.glob('*.tar.bz2'):
            local_filelist.append(i)
        for i in local_dir.glob('*.conda'):
            local_filelist.append(i)
        for i in set(local_filelist) - set(remote_filelist):
            logging.info("Deleting {}".format(i))
            i.unlink()
            delete_count += 1
        logging.info("{} files deleted".format(delete_count))

    logging.info("{}: {} files, {} in total".format(
        repodata_url, len(remote_filelist), sizeof_fmt(total_size)))
    return total_size

def sync_installer(repo_url, local_dir: Path):
    logging.info("Start syncing {}".format(repo_url))
    local_dir.mkdir(parents=True, exist_ok=True)
    # full_scan = random.random() < 0.1 # Do full version check less frequently
    full_scan = True

    def remote_list():
        r = requests.get(repo_url, timeout=TIMEOUT_OPTION)
        d = pq(r.content)
        for tr in d('table').find('tr'):
            tds = pq(tr).find('td')
            if len(tds) != 4:
                continue
            fname = tds[0].find('a').text
            sha256 = tds[3].text
            if not any(fname.endswith(suffix) for suffix in CONDA_INSTALLER_ARCHES):
                continue
            if sha256 == '<directory>' or len(sha256) != 64:
                continue
            yield (fname, sha256)

    for filename, sha256 in remote_list():
        pkg_url = "/".join([repo_url, filename])
        dst_file = local_dir / filename
        dst_file_wip = local_dir / ('.downloading.' + filename)

        if dst_file.is_file():
            r = requests.head(pkg_url, allow_redirects=True, timeout=TIMEOUT_OPTION)
            len_avail = 'content-length' in r.headers
            if len_avail:
                remote_filesize = int(r.headers['content-length'])
            remote_date = parsedate_to_datetime(r.headers['last-modified'])
            stat = dst_file.stat()
            local_filesize = stat.st_size
            local_mtime = stat.st_mtime

            # Do content verification on ~5% of files (see issue #25)
            if (not len_avail or remote_filesize == local_filesize) and remote_date.timestamp() == local_mtime and \
                    (random.random() < 0.95 or sha256_check(dst_file, sha256)):
                logging.info("Skipping {}".format(filename))

                # Stop the scanning if the most recent version is present
                if not full_scan:
                    logging.info("Stop the scanning")
                    break

                continue

            logging.info("Removing {}".format(filename))
            dst_file.unlink()

        for retry in range(3):
            logging.info("Downloading {}".format(filename))
            err = ''
            try:
                err = curl_download(pkg_url, dst_file_wip, sha256=sha256)
                if err is None:
                    dst_file_wip.rename(dst_file)
            except sp.CalledProcessError:
                err = 'CalledProcessError'
            if err is None:
                break
            logging.error("Failed to download {}: {}".format(filename, err))

def main():
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument("--working-dir", default=WORKING_DIR)
    parser.add_argument("--delete", action='store_true',
                        help='delete unreferenced package files')
    args = parser.parse_args()

    if args.working_dir is None:
        raise Exception("Working Directory is None")

    working_dir = Path(args.working_dir)
    size_statistics = 0
    random.seed()

    logging.info("Syncing installers...")
    for dist in ("miniconda", "archive"):
        remote_url = "{}/{}".format(CONDA_REPO_BASE_URL, dist)
        local_dir = working_dir / dist
        try:
            sync_installer(remote_url, local_dir)
            size_statistics += sum(
                f.stat().st_size for f in local_dir.glob('*') if f.is_file())
        except Exception:
            logging.exception("Failed to sync installers of {}".format(dist))

    for repo in CONDA_REPOS:
        for arch in CONDA_ARCHES:
            remote_url = "{}/pkgs/{}/{}".format(CONDA_REPO_BASE_URL, repo, arch)
            local_dir = working_dir / "pkgs" / repo / arch

            tmpdir = tempfile.mkdtemp()
            try:
                size_statistics += sync_repo(remote_url,
                                             local_dir, Path(tmpdir), args.delete)
            except Exception:
                logging.exception("Failed to sync repo: {}/{}".format(repo, arch))
            finally:
                shutil.rmtree(tmpdir)

    for repo in CONDA_CLOUD_REPOS:
        remote_url = "{}/{}".format(CONDA_CLOUD_BASE_URL, repo)
        local_dir = working_dir / "cloud" / repo

        tmpdir = tempfile.mkdtemp()
        try:
            size_statistics += sync_repo(remote_url,
                                         local_dir, Path(tmpdir), args.delete)
        except Exception:
            logging.exception("Failed to sync repo: {}".format(repo))
        finally:
            shutil.rmtree(tmpdir)

    print("Total size is", sizeof_fmt(size_statistics, suffix=""))

if __name__ == "__main__":
    main()

# vim: ts=4 sw=4 sts=4 expandtab

如果要使用该脚本,首先要安装 Python,然后再安装对应的 Python 软件包。启动指令为:

python anaconda.py --working-dir=[PATH]

其中 [PATH] 为储存镜像的目录,需要保证这个目录空间充足,建议至少留 500GB 空间。我宿舍的网速限制为 50Mbps,同步 800GB 花了两天。

运行 HTTP 服务

同步好后,就可以配置 Nginx HTTP 服务器了,当然如果懒得用 Nginx,也可以直接用 Python,性能差一点罢了:python -m http.server

Nginx 配置文件中,配置好 root 目录就行了。同时也可以打开 autoindex 功能,这样访问的时候就能列出所有内容了。配置文件的片段如下,记得把 [PATH] 替换为自己的目录:

root [PATH];

location / {
    try_files $uri $uri/ =404;
    autoindex on;
    autoindex_exact_size off;
    autoindex_localtime on;
}

最后,还可以用 crontab 实现定时任务,定时进行同步,使用 crontab -e 编辑任务,然后添加一行:

0 9 * * * /mirror/anaconda.py --working-dir=/mirror/conda

前面的五位为定时时间,上面设置的是每天早上 9 点,后面是定时运行的指令,记得自己调整目录。

配置 conda 镜像源

conda 的配置文件在 ~/.condarc,打开这个配置文件,设置:

show_channel_urls: true
default_channels:
  - https://conda.dorm.diona.moe/pkgs/main
  - https://conda.dorm.diona.moe/pkgs/free
  - https://conda.dorm.diona.moe/pkgs/r
  - https://conda.dorm.diona.moe/pkgs/msys2
custom_channels:
  conda-forge: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
  msys2: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
  bioconda: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
  menpo: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
  pytorch: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
  pytorch-lts: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
  simpleitk: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud

default_channels 是同步的默认源,将其替换为我们本地镜像的地址。custom_channels 是额外的频道,我搭建的时候没有同步额外频道,所以这里我填写的是清华 TUNA 源。如果大家同步了额外频道,则将其对应的修改为本地镜像的地址。

本文链接:https://www.zouht.com/3310.html
本文使用:CC BY-NC-SA 4.0 许可
# # # #
首页      教程      自建 PyPI / Conda 本地缓存或镜像源

发表回复

textsms
account_circle
email

  • eriktse

    有没有自建docker镜像站的方法 [doge]

    9 月前 回复
    • @eriktse: 我去,那个大小就更加上天了。我印象里 Docker 镜像动不动就上 1GB,这要全部镜像下来我猜得几百 TB.

      9 月前 回复
  • galesaur

    那个py脚本运行报错了,提示
    File “anaconda1.py”, line 48
    def md5_check(file: Path, md5: str=None):
    file后那个:下有个箭头,咋处理呀?

    7 月前 回复

颢天

自建 PyPI / Conda 本地缓存或镜像源
Pip 和 Conda 是 Python 的两大软件包管理工具,它们的官方源在国内访问困难,下载速度非常慢。一般情况下我们使用的都是国内的镜像源,例如清华大学的 TUNA 镜像站、阿里云的镜像站。 …
扫描二维码继续阅读
2023-07-23