import json
import logging
from optparse import Values
from typing import TYPE_CHECKING, Iterator, List, Optional, Sequence, Tuple, cast

from pip._vendor.packaging.utils import canonicalize_name

from pip._internal.cli import cmdoptions
from pip._internal.cli.req_command import IndexGroupCommand
from pip._internal.cli.status_codes import SUCCESS
from pip._internal.exceptions import CommandError
from pip._internal.index.collector import LinkCollector
from pip._internal.index.package_finder import PackageFinder
from pip._internal.metadata import BaseDistribution, get_environment
from pip._internal.models.selection_prefs import SelectionPreferences
from pip._internal.network.session import PipSession
from pip._internal.utils.misc import stdlib_pkgs, tabulate, write_output
from pip._internal.utils.parallel import map_multithread

if TYPE_CHECKING:
    from pip._internal.metadata.base import DistributionVersion

    class _DistWithLatestInfo(BaseDistribution):
        """Give the distribution object a couple of extra fields.

        These will be populated during ``get_outdated()``. This is dirty but
        makes the rest of the code much cleaner.
        """
        latest_version: DistributionVersion
        latest_filetype: str

    _ProcessedDists = Sequence[_DistWithLatestInfo]


logger = logging.getLogger(__name__)


class ListCommand(IndexGroupCommand):
    """
    List installed packages, including editables.

    Packages are listed in a case-insensitive sorted order.
    """

    ignore_require_venv = True
    usage = """
      %prog [options]"""

    def add_options(self) -> None:
        self.cmd_opts.add_option(
            '-o', '--outdated',
            action='store_true',
            default=False,
            help='List outdated packages')
        self.cmd_opts.add_option(
            '-u', '--uptodate',
            action='store_true',
            default=False,
            help='List uptodate packages')
        self.cmd_opts.add_option(
            '-e', '--editable',
            action='store_true',
            default=False,
            help='List editable projects.')
        self.cmd_opts.add_option(
            '-l', '--local',
            action='store_true',
            default=False,
            help=('If in a virtualenv that has global access, do not list '
                  'globally-installed packages.'),
        )
        self.cmd_opts.add_option(
            '--user',
            dest='user',
            action='store_true',
            default=False,
            help='Only output packages installed in user-site.')
        self.cmd_opts.add_option(cmdoptions.list_path())
        self.cmd_opts.add_option(
            '--pre',
            action='store_true',
            default=False,
            help=("Include pre-release and development versions. By default, "
                  "pip only finds stable versions."),
        )

        self.cmd_opts.add_option(
            '--format',
            action='store',
            dest='list_format',
            default="columns",
            choices=('columns', 'freeze', 'json'),
            help="Select the output format among: columns (default), freeze, "
                 "or json",
        )

        self.cmd_opts.add_option(
            '--not-required',
            action='store_true',
            dest='not_required',
            help="List packages that are not dependencies of "
                 "installed packages.",
        )

        self.cmd_opts.add_option(
            '--exclude-editable',
            action='store_false',
            dest='include_editable',
            help='Exclude editable package from output.',
        )
        self.cmd_opts.add_option(
            '--include-editable',
            action='store_true',
            dest='include_editable',
            help='Include editable package from output.',
            default=True,
        )
        self.cmd_opts.add_option(cmdoptions.list_exclude())
        index_opts = cmdoptions.make_option_group(
            cmdoptions.index_group, self.parser
        )

        self.parser.insert_option_group(0, index_opts)
        self.parser.insert_option_group(0, self.cmd_opts)

    def _build_package_finder(
        self, options: Values, session: PipSession
    ) -> PackageFinder:
        """
        Create a package finder appropriate to this list command.
        """
        link_collector = LinkCollector.create(session, options=options)

        # Pass allow_yanked=False to ignore yanked versions.
        selection_prefs = SelectionPreferences(
            allow_yanked=False,
            allow_all_prereleases=options.pre,
        )

        return PackageFinder.create(
            link_collector=link_collector,
            selection_prefs=selection_prefs,
        )

    def run(self, options: Values, args: List[str]) -> int:
        if options.outdated and options.uptodate:
            raise CommandError(
                "Options --outdated and --uptodate cannot be combined.")

        cmdoptions.check_list_path_option(options)

        skip = set(stdlib_pkgs)
        if options.excludes:
            skip.update(canonicalize_name(n) for n in options.excludes)

        packages: "_ProcessedDists" = [
            cast("_DistWithLatestInfo", d)
            for d in get_environment(options.path).iter_installed_distributions(
                local_only=options.local,
                user_only=options.user,
                editables_only=options.editable,
                include_editables=options.include_editable,
                skip=skip,
            )
        ]

        # get_not_required must be called firstly in order to find and
        # filter out all dependencies correctly. Otherwise a package
        # can't be identified as requirement because some parent packages
        # could be filtered out before.
        if options.not_required:
            packages = self.get_not_required(packages, options)

        if options.outdated:
            packages = self.get_outdated(packages, options)
        elif options.uptodate:
            packages = self.get_uptodate(packages, options)

        self.output_package_listing(packages, options)
        return SUCCESS

    def get_outdated(
        self, packages: "_ProcessedDists", options: Values
    ) -> "_ProcessedDists":
        return [
            dist for dist in self.iter_packages_latest_infos(packages, options)
            if dist.latest_version > dist.version
        ]

    def get_uptodate(
        self, packages: "_ProcessedDists", options: Values
    ) -> "_ProcessedDists":
        return [
            dist for dist in self.iter_packages_latest_infos(packages, options)
            if dist.latest_version == dist.version
        ]

    def get_not_required(
        self, packages: "_ProcessedDists", options: Values
    ) -> "_ProcessedDists":
        dep_keys = {
            canonicalize_name(dep.name)
            for dist in packages
            for dep in (dist.iter_dependencies() or ())
        }

        # Create a set to remove duplicate packages, and cast it to a list
        # to keep the return type consistent with get_outdated and
        # get_uptodate
        return list({pkg for pkg in packages if pkg.canonical_name not in dep_keys})

    def iter_packages_latest_infos(
        self, packages: "_ProcessedDists", options: Values
    ) -> Iterator["_DistWithLatestInfo"]:
        with self._build_session(options) as session:
            finder = self._build_package_finder(options, session)

            def latest_info(
                dist: "_DistWithLatestInfo"
            ) -> Optional["_DistWithLatestInfo"]:
                all_candidates = finder.find_all_candidates(dist.canonical_name)
                if not options.pre:
                    # Remove prereleases
                    all_candidates = [candidate for candidate in all_candidates
                                      if not candidate.version.is_prerelease]

                evaluator = finder.make_candidate_evaluator(
                    project_name=dist.canonical_name,
                )
                best_candidate = evaluator.sort_best_candidate(all_candidates)
                if best_candidate is None:
                    return None

                remote_version = best_candidate.version
                if best_candidate.link.is_wheel:
                    typ = 'wheel'
                else:
                    typ = 'sdist'
                dist.latest_version = remote_version
                dist.latest_filetype = typ
                return dist

            for dist in map_multithread(latest_info, packages):
                if dist is not None:
                    yield dist

    def output_package_listing(
        self, packages: "_ProcessedDists", options: Values
    ) -> None:
        packages = sorted(
            packages,
            key=lambda dist: dist.canonical_name,
        )
        if options.list_format == 'columns' and packages:
            data, header = format_for_columns(packages, options)
            self.output_package_listing_columns(data, header)
        elif options.list_format == 'freeze':
            for dist in packages:
                if options.verbose >= 1:
                    write_output("%s==%s (%s)", dist.raw_name,
                                 dist.version, dist.location)
                else:
                    write_output("%s==%s", dist.raw_name, dist.version)
        elif options.list_format == 'json':
            write_output(format_for_json(packages, options))

    def output_package_listing_columns(
        self, data: List[List[str]], header: List[str]
    ) -> None:
        # insert the header first: we need to know the size of column names
        if len(data) > 0:
            data.insert(0, header)

        pkg_strings, sizes = tabulate(data)

        # Create and add a separator.
        if len(data) > 0:
            pkg_strings.insert(1, " ".join(map(lambda x: '-' * x, sizes)))

        for val in pkg_strings:
            write_output(val)


def format_for_columns(
    pkgs: "_ProcessedDists", options: Values
) -> Tuple[List[List[str]], List[str]]:
    """
    Convert the package data into something usable
    by output_package_listing_columns.
    """
    running_outdated = options.outdated
    # Adjust the header for the `pip list --outdated` case.
    if running_outdated:
        header = ["Package", "Version", "Latest", "Type"]
    else:
        header = ["Package", "Version"]

    data = []
    if options.verbose >= 1 or any(x.editable for x in pkgs):
        header.append("Location")
    if options.verbose >= 1:
        header.append("Installer")

    for proj in pkgs:
        # if we're working on the 'outdated' list, separate out the
        # latest_version and type
        row = [proj.raw_name, str(proj.version)]

        if running_outdated:
            row.append(str(proj.latest_version))
            row.append(proj.latest_filetype)

        if options.verbose >= 1 or proj.editable:
            row.append(proj.location or "")
        if options.verbose >= 1:
            row.append(proj.installer)

        data.append(row)

    return data, header


def format_for_json(packages: "_ProcessedDists", options: Values) -> str:
    data = []
    for dist in packages:
        info = {
            'name': dist.raw_name,
            'version': str(dist.version),
        }
        if options.verbose >= 1:
            info['location'] = dist.location or ""
            info['installer'] = dist.installer
        if options.outdated:
            info['latest_version'] = str(dist.latest_version)
            info['latest_filetype'] = dist.latest_filetype
        data.append(info)
    return json.dumps(data)
