Source code for ddrescue_gui.Tools.mount_tools

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Destination file mounting tools in the Tools Package for DDRescue-GUI
# This file is part of DDRescue-GUI.
# Copyright (C) 2013-2023 Hamish McIntyre-Bhatty
# DDRescue-GUI is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 3 or,
# at your option, any later version.
#
# DDRescue-GUI is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with DDRescue-GUI.  If not, see <http://www.gnu.org/licenses/>.

# pylint: disable=no-member,logging-not-lazy,no-else-return
#
#Reason (no-member): False positives, API changes.
#Reason (logging-not-lazy): This is a more readable way of logging.
#Reason (no-else-return): Lots of false positives.

"""
This is the destination file mount tools module in the tools package for DDRescue-GUI.
"""

import os
import plistlib
import json
import logging
import magic
import wx

from . import core as CoreTools

#Determine if running on Linux or Mac.
if "wxGTK" in wx.PlatformInfo:
    #Set the resource path to /usr/share/ddrescue-gui/
    RESOURCEPATH = '/usr/share/ddrescue-gui'
    LINUX = True

    #Check if we're running on Parted Magic.
    PARTED_MAGIC = (os.uname()[1] == "PartedMagic")

    #Check if we're running on Cygwin.
    CYGWIN = ("CYGWIN" in os.uname()[0])

elif "wxMac" in wx.PlatformInfo:
    try:
        #Set the resource path from an environment variable,
        #as mac .apps can be found in various places.
        RESOURCEPATH = os.environ['RESOURCEPATH']

    except KeyError:
        #Use '.' as the rescource path instead as a fallback.
        RESOURCEPATH = "."

    LINUX = False
    PARTED_MAGIC = False

#Dictionary variables.
SETTINGS = {}

#Set up logging.
logger = logging.getLogger(__name__)
logger.setLevel(logging.getLogger("DDRescue-GUI").getEffectiveLevel())

# -------------------- CORE METHODS --------------------
[docs]class Core: """ This class contains core methods used on both Linux and macOS """ #The output file we're mounting. output_file = None #Holds the mount point of the output file/volume on it that we mounted. output_file_mountpoint = None #The type(s) of the output file. If we mount an LVM volume on a device, #this may be set to ["Device", "LVM"] for example. output_file_types = [] #The device name(s) of the output file. If we mount an LVM volume, this #may be set to ["/dev/sde", "/dev/sde1"]. Corresponding indexes correlate #with the above variable. output_file_devicenames = []
[docs] @classmethod def reset(cls): """ Resets the state of this class, and triggers reset of the Linux and Mac classes. """ cls.output_file = None cls.output_file_mountpoint = None cls.output_file_types = [] cls.output_file_devicenames = [] Linux.reset() Mac.reset()
[docs] @classmethod def mount_output_file(cls): """ Mount the output file in SETTINGS["OutputFile"]. Returns: boolean. True - Success False - Failed """ logger.info("Core.mount_output_file(): Mounting Disk: "+SETTINGS["OutputFile"]+"...") #Determine what type of OutputFile we have (Partition or Device). if LINUX: _type, success = Linux.determine_output_file_type(SETTINGS["OutputFile"]) else: _type, success = Mac.determine_output_file_type(SETTINGS["OutputFile"]) #If we failed, report to user. if not success: logger.error("Core.mount_output_file(): Error! Warning the user...") dlg = wx.MessageDialog(None, "Couldn't mount your output file. The hard disk " "image utility failed to run. This could mean your disk image " "is damaged, and you need to use a different tool to read it.", "DDRescue-GUI - Error!", style=wx.OK | wx.ICON_ERROR) dlg.ShowModal() dlg.Destroy() return False Core.output_file_devicenames.append(SETTINGS["OutputFile"]) Core.output_file_types.append(_type) if "Partition" in Core.output_file_types: #We have a partition. logger.debug("Core.mount_output_file(): Output file is a partition...") #Attempt to mount the disk. if LINUX: return Linux.mount_partition(SETTINGS["OutputFile"]) else: return Mac.mount_partition(SETTINGS["OutputFile"], attach=True) else: #We have a device/container of some kind. if LINUX: return Linux.mount_device(SETTINGS["OutputFile"]) else: return Mac.mount_device(SETTINGS["OutputFile"])
[docs] @classmethod def unmount_output_file(cls): """ Unmount the output file. Returns: boolean. True - Success False - Failed """ logger.info("Core.unmount_output_file(): Attempting to unmount output file...") success = True #Unmount these in reverse order, otherwise it won't work. Core.output_file_devicenames.reverse() Core.output_file_types.reverse() for disk in Core.output_file_devicenames: logger.info("Core.unmount_output_file(): Unmounting "+disk+"...") #Try to umount the output file, if it has been mounted. if Core.output_file_mountpoint is not None: if CoreTools.unmount_disk(Core.output_file_mountpoint) == 0: logger.info("Core.unmount_output_file(): Successfully unmounted " "output file...") else: logger.error("Core.unmount_output_file(): Error unmounting output " "file! Warning user...") dlg = wx.MessageDialog(None, "It seems your output file is in use. " "Please close all applications that could be using it " "and try again.", "DDRescue-GUI - Warning", style=wx.OK | wx.ICON_INFORMATION) dlg.ShowModal() dlg.Destroy() return False if LINUX: if not Linux.unmount_output_file(disk): success = False else: if "/dev" in disk: if not Mac.unmount_output_file(disk): success = False #Reset everything if it worked. if success is True: Core.reset() return success
#------------------------------------ LINUX-SPECIFIC FUNCTIONS ------------------------------------
[docs]class Linux: """ Linux-specific stuff for mounting the output file. """ volume_group_name = None using_loop_device = False
[docs] @classmethod def reset(cls): """ Resets the state of this class to defaults. """ cls.volume_group_name = None cls.using_loop_device = False
[docs] @classmethod def determine_output_file_type(cls, output_file): #pylint: disable=invalid-name """ Determines output File Type (partition or device). Args: output_file (str): The output file or device to determine the type of. Returns: tuple(string, bool). 1st element: The type of the output file. "Partition", "Device", or "LVM". 2nd element: True - success, False - failed. """ #Set a default. output_file_type = "unknown" #--------------- USING PARTED TO DETECT PARTITION TABLES AND FILESYSTEMS --------------- #If list of partitions is empty (or 1 partition), we have a partition. retval, output = CoreTools.start_process(cmd="parted -sm '"+output_file+"' print", return_output=True, privileged=True) #NOTE: Exit code 1 on CD images, but still works. if retval not in (0, 1): return "unknown", False temp_output = output.split("\n") #Clean it up - errors from parted can mess this up. output = "unknown" for line in temp_output: if line == "": continue #All output lines end with a semicolon, and the first line can be ignored. if line[-1] == ";" and "BYT;" not in line: output = line break output = output.split(":") #We want field 6 - the partition table type. try: #The type will be "loop" if this is a partition. if output[5] == "loop": output_file_type = "Partition" #If we have any valid partition table, this is a device. elif output[5] in ("msdos", "gpt", "mac", "pc98", "sun", "dvh", "bsd", "amiga", "aix", "atari"): output_file_type = "Device" #If the type is unknown, this might be an ISO file (which we treat as a partition). elif output[5] == "unknown": if "ISO 9660 CD-ROM filesystem data" in magic.from_file(output_file): output_file_type = "Partition" except IndexError: pass #--------------- DETECTING LVM CONTAINERS --------------- #Check if this is a LVM container if parted didn't help. if output_file_type == "unknown": #Check for LVM. output = CoreTools.start_process(cmd="file -s '"+output_file+"'", return_output=True, privileged=True)[1] if "LVM" in output: output_file_type = "LVM" #Ask the user if we don't know what type the input file is. if output_file_type == "unknown": choices = ["Partition (single volume or CD/DVD image)", "Device (multiple partitions, choose if in doubt)", "LVM Container"] dlg = wx.SingleChoiceDialog(wx.GetApp().TopWindow.panel, "What type of file/device did you recover from?", "DDRescue-GUI - Question", choices, pos=wx.DefaultPosition) if dlg.ShowModal() == wx.ID_OK: answer = dlg.GetStringSelection() dlg.Destroy() #If the user doesn't answer, give up. else: dlg.Destroy() return "unknown", False #The first word in our human-readable choices is the type. output_file_type = answer.split()[0] return output_file_type, True
#--------------- FUNCTIONS FOR GETTING VOLUME INFORMATION ---------------
[docs] @classmethod def get_volumes_std_device(cls, output_file): """ Gets a list of volumes on the given output file or device name. This method expects the given file or device to be a standard device. Args: output_file (str): The output file or device to get volumes for. Returns. list. The volumes that were found in human-readable form. """ Linux.using_loop_device = False #Create a loop device if this is a file and a regular device. if "/dev/" not in output_file: Linux.using_loop_device = True logger.info("Linux.get_volumes_std_device(): Creating loop device...") kpartx_output = CoreTools.start_process(cmd="kpartx -av '" + output_file+"'", return_output=True, privileged=True)[1] kpartx_output = kpartx_output.split("\n") #Do a part probe to make sure the loop device has been searched. CoreTools.start_process(cmd="partprobe", privileged=True) #Get some Disk information. lsblk_output = CoreTools.start_process(cmd="lsblk -J -o NAME,FSTYPE,SIZE", return_output=True, privileged=True)[1].split("\n") #Remove any errors from lsblk in the output. cleaned_lsblk_output = [] for line in lsblk_output: if "lsblk:" not in line: cleaned_lsblk_output.append(line) lsblk_output = '\n'.join(cleaned_lsblk_output) #Parse into a dictionary w/ json. try: lsblk_output = json.loads(lsblk_output) except ValueError as error: logger.error("Linux.get_volumes_std_device(): Failed to run lsblk. Error:" +str(error)) dlg = wx.MessageDialog(None, "Failed to gather information about the output file." "This could indicate a problem with your recovered image. " "It's possible the data you recovered is partially corrupted, " "and you need to use another tool to extract meaningful data " "from it.", "DDRescue-GUI - Error", style=wx.OK | wx.ICON_ERROR) dlg.ShowModal() dlg.Destroy() return False if Linux.using_loop_device: #Get the name of the loop device. #The list comprehensions are needed because the kpartx output has the partitions only. #eg: add map loop1p1 (253:0): 0 251904 linear 7:1 2048 try: #First, get the loop-and-partition section (eg loop1p1). temp = kpartx_output[0].split()[2] #Now get rid of the partition number to get just the loop device name. target_device = 'p'.join(temp.split("p")[0:2]) except IndexError: return [] else: target_device = output_file choices = [] #Get the info related to this partition. for device in lsblk_output["blockdevices"]: #Ignore other devices. if target_device not in (device["name"], "/dev/"+device["name"]): continue #Add all the partitions to the choices list. for disk in device["children"]: #Add stuff, trying to keep it human-readable. if disk["fstype"] is None: disk["fstype"] = "None" choices.append("Partition "+disk["name"] + ", Filesystem: "+disk["fstype"] + ", Size: "+disk["size"]) return choices
[docs] @classmethod def get_volumes_lvm(cls, output_file): """ Gets a list of volumes on the given output file or device name. This method expects the given file or device to be an LVM Physical Volume. Args: output_file (str): The output file or device to get volumes for. Returns. list. The volumes that were found in human-readable form. """ pv_device = None #First, set up a loop device if this is a file. if "/dev/" not in output_file: counter = 0 #Practically speaking, there is no limit to how many loop devices there can be. while pv_device is None: if not os.path.exists("/dev/loop"+str(counter)): pv_device = "/dev/loop"+str(counter) break counter += 1 retval = CoreTools.start_process(cmd="losetup "+pv_device+" '"+output_file+"'", privileged=True) if retval != 0: logger.error("Linux.get_volumes_lvm(): Unable to set up loop device!") return [] else: pv_device = output_file retval, output = CoreTools.start_process(cmd="pvs -y", return_output=True, privileged=True) if retval != 0: logger.error("Linux.get_volumes_lvm(): Could not obtain information about LVM PVs!") return [] #Read pvs's output to find the volume group name for this device. for line in output.split("\n"): if pv_device in line: Linux.volume_group_name = line.split()[1] #Activate the volume group. retval = CoreTools.start_process(cmd="vgchange -a y "+Linux.volume_group_name, privileged=True) if retval != 0: logger.error("Linux.get_volumes_lvm(): Unable to activate volume group!") return [] #Find logical volumes. retval, lvdisplay_output = CoreTools.start_process(cmd="lvdisplay -C --units M", return_output=True, privileged=True) if retval != 0: logger.error("Linux.get_volumes_lvm(): Unable to obtain information about LVM LVs!") return [] lvdisplay_output = lvdisplay_output.split("\n") choices = [] #Find all volumes that correspond to our volume group. for line in lvdisplay_output: if Linux.volume_group_name in line: splitline = line.split() choices.append("Volume "+splitline[0] + ", Size: "+splitline[3]) return choices
#--------------- MOUNTING AND UNMOUNTING FUNCTIONS ---------------
[docs] @classmethod def mount_partition(cls, partition): """ Mounts the given file or device name as a single volume or partition. Args: partition (str): The file or device to mount. Returns: Boolean. True - Success False - Failed """ #We will mount the file/device in /tmp/ddrescue-gui/destination Core.output_file_mountpoint = "/tmp/ddrescue-gui/destination" retval = CoreTools.mount_disk(partition=partition, mount_point=Core.output_file_mountpoint, options="-r") if retval != 0: logger.error("Linux.mount_partition(): Error! Warning the user...") dlg = wx.MessageDialog(None, "Couldn't mount your output file. Most " "probably, the filesystem is damaged and you'll need to " "use another tool to read it from here. It could also be " "that your OS doesn't support this filesystem, or that " "the recovery is incomplete, as that can sometimes cause " "this problem.", "DDRescue-GUI - Error!", style=wx.OK | wx.ICON_ERROR) dlg.ShowModal() dlg.Destroy() return False return True
[docs] @classmethod def mount_device(cls, output_file): """ Mounts the given output file or device, expecting it to be a standard device or another kind of container for volumes - LVM. Args: output_file (str). The device or file to mount. Returns: Boolean. True - Success False - Failure """ #Create a nice list of volumes for the user to pick from. choices = [] logger.debug("Linux.mount_device(): Output file isn't a partition! Getting " "list of contained volumes...") #Only look at the last type - this way if we're mounting a sub-partition, we'll collect #information for that, not the container it's inside. if Core.output_file_types[-1] == "Device": choices = Linux.get_volumes_std_device(output_file) #Find LVM volumes. elif Core.output_file_types[-1] == "LVM": choices = Linux.get_volumes_lvm(output_file) #Check that this list isn't empty. if not choices: logger.error("Linux.mount_device(): Couldn't find any partitions " "to mount!") dlg = wx.MessageDialog(None, "Couldn't find any partitions to mount! " "This could indicate a problem with your recovered image. " "It's possible the data you recovered is partially corrupted, " "and you need to use another tool to extract meaningful data " "from it.", "DDRescue-GUI - Error", style=wx.OK | wx.ICON_ERROR) dlg.ShowModal() dlg.Destroy() return False if len(choices) >= 2: #Sort the list alphabetically (it can sometimes be out of order). choices.sort() #Ask the user which partition to mount. logger.debug("mount_output_file(): Asking user which partition to mount...") dlg = wx.SingleChoiceDialog(None, "Please select which partition you wish " "to mount.", "DDRescue-GUI - Select a Partition", choices) #Respond to the user's action. if dlg.ShowModal() != wx.ID_OK: Core.output_file_mountpoint = None logger.debug("mount_output_file(): User cancelled operation. " "Cleaning up...") Core.unmount_output_file() return False #Get selected partition's name. full_selection = dlg.GetStringSelection() selected_partition = full_selection.split()[1].replace(",", "") dlg.Destroy() else: #There is only 1 choice so we'll pick that automatically. full_selection = choices[0] selected_partition = choices[0].split()[1].replace(",", "") #Attempt to mount, and handle it if the mount attempt failed. if Core.output_file_types[-1] == "Device": if Linux.using_loop_device: device_to_mount = "/dev/mapper/"+selected_partition else: device_to_mount = "/dev/"+selected_partition elif Core.output_file_types[-1] == "LVM": device_to_mount = "/dev/"+Linux.volume_group_name+"/"+selected_partition #Add this partition to the list so it is unmounted before #deactivating the volume group. Core.output_file_types.append("Partition") Core.output_file_devicenames.append(device_to_mount) #Caveats for mounting LVM volumes just selected. if Core.output_file_types[-1] == "Device" and "LVM" in full_selection: Core.output_file_types.append("LVM") Core.output_file_devicenames.append(device_to_mount) Linux.mount_device(device_to_mount) else: if not Linux.mount_partition(device_to_mount): return False logger.info("Linux.mount_device(): Success! Waiting for user to finish " "with it and prompt to unmount it...") return True
[docs] @classmethod def unmount_output_file(cls, output_file): """ Unmounts the output file or device. Handles partitions, devices, and LVM disks. Args: output_file (str). The device or file to unmount. Returns: Boolean. True - Success False - Failed """ #Pull down loops if the OutputFile is a Device. if Core.output_file_types[Core.output_file_devicenames.index(output_file)] == "Device": #This won't error on LINUX even if the loop device wasn't set up. logger.debug("unmount_output_file(): Pulling down loop device...") cmd = "kpartx -d '"+output_file+"'" #Deactivate volume group if needed. elif Core.output_file_types[Core.output_file_devicenames.index(output_file)] == "LVM": #Shouldn't cause an error if volume group is already deactivated. logger.debug("Linux.unmount_output_file(): Deactivating volume group...") cmd = "vgchange -a n "+Linux.volume_group_name elif Core.output_file_types[Core.output_file_devicenames.index(output_file)] == "Partition": #Partition, no extra command needed. Return True. return True if CoreTools.start_process(cmd=cmd, return_output=False, privileged=True) == 0: logger.info("Linux.unmount_output_file(): Success...") return True else: logger.info("Linux.unmount_output_file(): Failed to pull down the " "loop device! Warning user...") dlg = wx.MessageDialog(None, "Couldn't finish unmounting your output file! " "Please close all applications that could be using it and " "try again.", "DDRescue-GUI - Warning", style=wx.OK | wx.ICON_INFORMATION) dlg.ShowModal() dlg.Destroy() return False
#------------------------------------ MACOS-SPECIFIC FUNCTIONS ------------------------------------
[docs]class Mac: """ Macos-specific stuff for mounting the output file. """
[docs] @classmethod def reset(cls): """ Resets the state of this class to defaults. No action needed, just here to retain compatibility. """
[docs] @classmethod def determine_output_file_type(cls, output_file): #pylint: disable=invalid-name """ Determines output File Type (partition or device).. Returns: tuple(string, bool). 1st element: The type of the output file. "Partition", "Device", "CD", "APFSStore", "APFSContainer" or "APFSVolume". 2nd element: True - success, False - failed. """ retval, output = Mac.run_hdiutil(options="imageinfo '"+output_file +"' -plist") #If "whole disk" is in the output, this is a partition. if "whole disk" in output and not "APFS" in output: output_file_type = "Partition" #If there's an ISO9660 filesystem, treat this as a CD image. elif "ISO9660" in output: output_file_type = "CD" #APFS stuff. elif "Apple_APFS" in output: if "whole disk" in output: output_file_type = "APFSContainer" elif "unknown partition" in output: output_file_type = "APFSVolume" else: output_file_type = "APFSStore" else: output_file_type = "Device" logger.debug("determine_output_file_type(): Type is "+output_file_type+"...") return output_file_type, retval == 0
[docs] @classmethod def attach_file(cls, output_file): """ Attaches the given output file to the system as a read-only device. Args: output_file (str). The output file to attach. Returns: tuple. Elements: 1 - int. The return value from hdiutil attach. 2 - str. The device name of the file, or None if attaching failed. """ retval, output = Mac.run_hdiutil("attach '"+output_file + "' -nomount -readonly -plist") #Get the device name devicename, result = Mac.get_device_name(output) if not result: return retval, None return retval, devicename
[docs] @classmethod def get_volumes_std_device(cls, output_file, cdimage=False): """ Finds volumes contained by standard devices. Args: output_file (str). The output file or device to investigate. cdimage[=False] (bool). Whether or not we are finding volumes on a CD device/image. Returns. list. The volumes that were found in human-readable form. """ hdiutil_imageinfo_output = Mac.run_hdiutil(options="imageinfo '"+output_file +"' -plist")[1] #Fix for older macOS versions that put kernel messages in the output. temp = "" for line in hdiutil_imageinfo_output.split("\n"): if "nx_kernel_mount" not in line: temp += line hdiutil_imageinfo_output = plistlib.loads(temp.encode()) #Get the block size of the image. blocksize = hdiutil_imageinfo_output["partitions"]["block-size"] output = hdiutil_imageinfo_output["partitions"]["partitions"] partno = 1 choices = [] for partition in output: size = str((partition["partition-length"] * blocksize) // 1000000)+" MB" if not cdimage: #Skip non-partition things and any "partitions" that don't have numbers. #CD images work differently, and we must ignore this rule. if "partition-number" not in partition and \ "APFS" not in partition["partition-hint"]: continue else: #Ignore "partitions" that don't start at 0. if partition["partition-start"] != 0: continue #Set the partition number for CD images. partition["partition-number"] = partno partno += 1 #Ignore partition size for CD images. size = "N/A" choices.append("Partition "+str(partition["partition-number"]) + ", with size "+size) return choices
[docs] @classmethod def get_volumes_apfs(cls, output_file): """ Finds volumes contained by APFS containers. Args: output_file (str). The output file or device to investigate. Returns. list. The volumes that were found in human-readable form. """ hdiutil_imageinfo_output = Mac.run_hdiutil(options="imageinfo '"+output_file +"' -plist")[1] #Fix for older macOS versions that put kernel messages in the output. temp = "" for line in hdiutil_imageinfo_output.split("\n"): if "nx_kernel_mount" not in line: temp += line hdiutil_imageinfo_output = plistlib.loads(temp.encode()) #Get the block size of the image. blocksize = hdiutil_imageinfo_output["partitions"]["block-size"] output = hdiutil_imageinfo_output["partitions"]["partitions"] partno = 1 choices = [] for partition in output: #Skip non-partition things and any "partitions" that don't have numbers. if "partition-number" not in partition and "APFS" not in partition["partition-hint"]: continue #Set the partition number for APFS volumes. if "APFS" in partition["partition-hint"]: partition["partition-number"] = partno partno += 1 choices.append("Partition "+str(partition["partition-number"]) + ", with size "+str((partition["partition-length"] \ * blocksize) // 1000000) +" MB") return choices
[docs] @classmethod def mount_partition(cls, partition, attach=False): """ Mounts the given partition, also attaching the file if needed. Args: partition (str). The partition or file to mount. attach[=False] (bool). Whether to attach the file first. Returns: boolean. True - Success False - Failed """ #Attach the file first if needed. if attach: retval, partition = Mac.attach_file(partition) #We will mount the file/device in /tmp/ddrescue-gui/destination Core.output_file_mountpoint = "/tmp/ddrescue-gui/destination" retval = CoreTools.mount_disk(partition=partition, mount_point=Core.output_file_mountpoint, options="readOnly") if retval != 0: logger.error("Mac.mount_partition(): Error! Warning the user...") dlg = wx.MessageDialog(None, "Couldn't mount your output file. Most " "probably, the filesystem is damaged and you'll need to " "use another tool to read it from here. It could also be " "that your OS doesn't support this filesystem, or that " "the recovery is incomplete, as that can sometimes cause " "this problem.", "DDRescue-GUI - Error!", style=wx.OK | wx.ICON_ERROR) dlg.ShowModal() dlg.Destroy() return False return True
[docs] @classmethod def mount_device(cls, output_file): """ Mount the given device or file. This is expected to be a standard device or other container of volumes (eg an APFS container). Args: output_file (str). The device or file to mount. Returns: Boolean. True - Success False - Failure """ logger.debug("Mac.mount_device(): Output file isn't a partition! Getting " "list of contained partitions...") #Only look at the last type - this way if we're mounting a sub-partition, we'll collect #information for that, not the container it's inside. if Core.output_file_types[-1] in ("Device", "APFSStore"): choices = Mac.get_volumes_std_device(output_file) elif Core.output_file_types[-1] == "CD": choices = Mac.get_volumes_std_device(output_file, cdimage=True) #APFS containers. elif Core.output_file_types[-1] in ("APFSContainer", "APFSVolume"): choices = Mac.get_volumes_apfs(output_file) #Check that this list isn't empty. if not choices: logger.error("Mac.mount_device(): Couldn't find any partitions to mount!") dlg = wx.MessageDialog(None, "Couldn't find any partitions to mount! " "This could indicate a problem with your recovered image. " "It's possible the data you recovered is partially corrupted, " "and you need to use another tool to extract meaningful data " "from it.", "DDRescue-GUI - Error", style=wx.OK | wx.ICON_ERROR) dlg.ShowModal() dlg.Destroy() return False if len(choices) >= 2: #Sort the list alphabetically (it can sometimes be out of order). choices.sort() #Ask the user which partition to mount. logger.debug("Mac.mount_device(): Asking user which partition to mount...") dlg = wx.SingleChoiceDialog(None, "Please select which partition you wish " "to mount.", "DDRescue-GUI - Select a Partition", choices) #Respond to the user's action. if dlg.ShowModal() != wx.ID_OK: Core.output_file_mountpoint = None logger.debug("Mac.mount_device(): User cancelled operation. " "Cleaning up...") return False #Get selected partition's name. selected_partition = dlg.GetStringSelection().split()[1].replace(",", "") #Get selected partition's name. full_selection = dlg.GetStringSelection() selected_partition = full_selection.split()[1].replace(",", "") dlg.Destroy() else: #There is only 1 choice so we'll pick that automatically. full_selection = choices[0] selected_partition = choices[0].split()[1].replace(",", "") #Notify user of mount attempt. logger.info("Mac.mount_device(): Mounting partition " + selected_partition+" of "+output_file+"...") #Attempt to mount the disk (this mounts all partitions inside), #and parse the resulting plist. (retval, mount_output) = \ Mac.run_hdiutil("attach '"+output_file+"' -readonly -nomount -plist") mount_output = plistlib.loads(mount_output.encode()) #Handle it if the mount attempt failed. if retval != 0: logger.error("Mac.mount_device(): Error! Warning the user...") dlg = wx.MessageDialog(None, "Couldn't mount your output file. Most " "probably, the filesystem is damaged or unsupported " "and you'll need to use another tool to read it from " "here. It could also be that your recovery is incomplete, " "as that can sometimes cause this problem.", "DDRescue-GUI - Error!", style=wx.OK | wx.ICON_ERROR) dlg.ShowModal() dlg.Destroy() return False #We need to get the device name, so we can mount the partition we want. #Get the list of disks mounted. disks = mount_output["system-entities"] #Get the device name given to the output file. #Set this so if we don't find our partition, we can still unmount the image #when we report failure. Core.output_file_devicenames.append(disks[0]["dev-entry"]) success = False if Core.output_file_types[-1] in ("Device", "APFSStore", "APFSContainer"): #Check that the filesystem the user wanted is among those that #have been marked mountable. for partition in disks: disk = partition["dev-entry"] if Core.output_file_types[-1] in ("Device", "APFSStore") \ and disk.split("s")[-1] != selected_partition: continue #Find the type of this partition. _type, success = Mac.determine_output_file_type(disk) #Check if the partition we want is mountable if partition["potentially-mountable"] and _type == "Partition": success = Mac.mount_partition(disk) #If this is an APFS container and we haven't reached the last #disk yet, keep going. if Core.output_file_types[-1] == "APFSContainer" \ and disks.index(partition) != (len(disks) - 1): continue #Handle APFS containers. if _type == "APFSContainer": Core.output_file_types.append("APFSContainer") Core.output_file_devicenames.append(disk) success = Mac.mount_device(disk) #Handle APFS stores. elif _type == "APFSStore": Core.output_file_types.append("APFSStore") Core.output_file_devicenames.append(disk) success = Mac.mount_device(disk) elif Core.output_file_types[-1] == "CD": disk = disks[0]["dev-entry"] if disks[0]["potentially-mountable"]: success = Mac.mount_partition(disk) if not success: logger.info("Mac.mount_device(): Unsupported or damaged filesystem. " "Warning user and cleaning up...") Core.unmount_output_file() dlg = wx.MessageDialog(None, "That filesystem is either not supported by " "macOS, or it is damaged (perhaps because the recovery is " "incomplete). Please try again and select a different " "partition.", "DDRescue-GUI - Error", wx.OK | wx.ICON_ERROR) dlg.ShowModal() dlg.Destroy() return False logger.info("Mac.mount_device(): Success! Waiting for user to finish with " "it and prompt to unmount it...") return True
[docs] @classmethod def unmount_output_file(cls, devicename): """ Unmounts the given device. Can be used for output files as well, but needs to be given the device associated with them. Args: devicename (str). The device to unmount. Returns: Boolean. True - Success False - Failed """ #Always detach the image's device file. logger.debug("Mac.unmount_output_file(): Detaching the device that " "represents the image...") cmd = "hdiutil detach "+devicename #Ignore when devices don't exist - can happen if already unmounted. if not os.path.exists(devicename): return True if CoreTools.start_process(cmd=cmd, return_output=False, privileged=True) == 0: logger.info("Mac.unmount_output_file(): Successfully pulled down " "loop device...") return True else: logger.info("Mac.unmount_output_file(): Failed to pull down the " "loop device! Warning user...") dlg = wx.MessageDialog(None, "Couldn't finish unmounting your output file! " "Please close all applications that could be using it and " "try again.", "DDRescue-GUI - Warning", style=wx.OK | wx.ICON_INFORMATION) dlg.ShowModal() dlg.Destroy() return False
[docs] @classmethod def get_device_name(cls, output): """ Get the device name of an output file, given output from hdiutil attach -plist. Args: output (string). Output from "hdiutil attach -plist", the command used to mount the output file. Returns: tuple(<inconsistent types>). 1st element: The device name of the output file eg "/dev/disk5", or None if unable to determine it. 2nd element: True (boolean) if successful in determining device name and mount point. Otherwise, a string describing the error eg "UnicodeError". """ #Parse the plist (Property List). hdiutil_output = plistlib.loads(output.encode()) #Find the disk and get the mountpoint. try: if len(hdiutil_output["system-entities"]) > 1: mounted_disk = hdiutil_output["system-entities"][1] else: mounted_disk = hdiutil_output["system-entities"][0] except IndexError: return None, "IndexError" return mounted_disk["dev-entry"], True
[docs] @classmethod def run_hdiutil(cls, options): """ Runs hdiutil on behalf of the rest of the program when called. Tries to handle and fix hdiutil errors (e.g. 'Resource Temporarily Unavailable') if they occur. Args: options (string). All of the options to pass to hdiutil. Returns: tuple(int, string). 1st element: The return value from hdiutil. 2nd element: The output from hdiutil. """ retval, output = CoreTools.start_process(cmd="hdiutil "+options, return_output=True, privileged=True) #Handle this common error - image in use. if "Resource temporarily unavailable" in output or retval != 0: logger.warning("Mac.run_hdiutil(): Attempting to fix hdiutil resource error...") #Fix by detaching all disks - certain disks eg system disk will fail, but it should #fix our problem. On OS X >= 10.11 we can check for "(disk image)", but cos we support #10.10, we have to just detach all possible disks and ignore failures. #TODO Consider dropping support for macOS 10.10 to improve reliability. #Or could detect version and behave differently on newer versions. #This bug doesn't seem to be a big deal anyway. for line in CoreTools.start_process(cmd="diskutil list", return_output=True)[1].split("\n"): try: if line.split()[0].split("/")[1] == "dev": #This is a line with a device name on it. logger.warning("Mac.run_hdiutil(): Attempting to detach " + line.split()[0]+"...") CoreTools.start_process(cmd="hdiutil detach "+line.split()[0], privileged=True) except IndexError: pass #Try again. retval, output = CoreTools.start_process(cmd="hdiutil "+options, return_output=True, privileged=True) return retval, output