ACIL FM
Dark
Refresh
Current DIR:
/usr/lib/python3.9/site-packages/cockpit
/
usr
lib
python3.9
site-packages
cockpit
Upload
Zip Selected
Delete Selected
Pilih semua
Nama
Ukuran
Permission
Aksi
channels
-
chmod
Open
Rename
Delete
data
-
chmod
Open
Rename
Delete
_vendor
-
chmod
Open
Rename
Delete
__pycache__
-
chmod
Open
Rename
Delete
beiboot.py
22.15 MB
chmod
View
DL
Edit
Rename
Delete
beipack.py
2.99 MB
chmod
View
DL
Edit
Rename
Delete
bridge.py
12 MB
chmod
View
DL
Edit
Rename
Delete
channel.py
22.32 MB
chmod
View
DL
Edit
Rename
Delete
config.py
3.37 MB
chmod
View
DL
Edit
Rename
Delete
internal_endpoints.py
5.95 MB
chmod
View
DL
Edit
Rename
Delete
jsonutil.py
7.42 MB
chmod
View
DL
Edit
Rename
Delete
osinfo.py
929 B
chmod
View
DL
Edit
Rename
Delete
packages.py
21.25 MB
chmod
View
DL
Edit
Rename
Delete
peer.py
12.45 MB
chmod
View
DL
Edit
Rename
Delete
polkit.py
7.4 MB
chmod
View
DL
Edit
Rename
Delete
polyfills.py
2.27 MB
chmod
View
DL
Edit
Rename
Delete
protocol.py
9.52 MB
chmod
View
DL
Edit
Rename
Delete
remote.py
8.93 MB
chmod
View
DL
Edit
Rename
Delete
router.py
10.08 MB
chmod
View
DL
Edit
Rename
Delete
samples.py
17.02 MB
chmod
View
DL
Edit
Rename
Delete
superuser.py
9.74 MB
chmod
View
DL
Edit
Rename
Delete
transports.py
17.92 MB
chmod
View
DL
Edit
Rename
Delete
_version.py
20 B
chmod
View
DL
Edit
Rename
Delete
__init__.py
68 B
chmod
View
DL
Edit
Rename
Delete
Edit file: /usr/lib/python3.9/site-packages/cockpit/polkit.py
# This file is part of Cockpit. # # Copyright (C) 2023 Red Hat, Inc. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. import asyncio import locale import logging import os import pwd from typing import Dict, List, Sequence, Tuple from cockpit._vendor.ferny import AskpassHandler from cockpit._vendor.systemd_ctypes import Variant, bus # that path is valid on at least Debian, Fedora/RHEL, and Arch HELPER_PATH = '/usr/lib/polkit-1/polkit-agent-helper-1' AGENT_DBUS_PATH = '/PolkitAgent' logger = logging.getLogger(__name__) Identity = Tuple[str, Dict[str, Variant]] # https://www.freedesktop.org/software/polkit/docs/latest/eggdbus-interface-org.freedesktop.PolicyKit1.AuthenticationAgent.html # Note that we don't implement the CancelAuthentication() API. pkexec gets called in a way that has no opportunity to # cancel an ongoing authentication from the pkexec side. On the UI side cancellation is implemented via the standard # asyncio process mechanism. If we ever need CancelAuthentication(), we could keep a cookie → get_current_task() # mapping, but that method is not available for Python 3.6 yet. class org_freedesktop_PolicyKit1_AuthenticationAgent(bus.Object): def __init__(self, responder: AskpassHandler): super().__init__() self.responder = responder # confusingly named: this actually does the whole authentication dialog, see docs @bus.Interface.Method('', ['s', 's', 's', 'a{ss}', 's', 'a(sa{sv})']) async def begin_authentication(self, action_id: str, message: str, icon_name: str, details: Dict[str, str], cookie: str, identities: Sequence[Identity]) -> None: logger.debug('BeginAuthentication: action %s, message "%s", icon %s, details %s, cookie %s, identities %r', action_id, message, icon_name, details, cookie, identities) # only support authentication as ourselves, as we don't yet have the # protocol plumbing and UI to select an admin user my_uid = os.geteuid() for (auth_type, subject) in identities: if auth_type == 'unix-user' and 'uid' in subject and subject['uid'].value == my_uid: logger.debug('Authentication subject %s matches our uid %d', subject, my_uid) break else: logger.warning('Not supporting authentication as any of %s', identities) return user_name = pwd.getpwuid(my_uid).pw_name process = await asyncio.create_subprocess_exec(HELPER_PATH, user_name, cookie, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE) try: await self._communicate(process) except asyncio.CancelledError: logger.debug('Cancelled authentication') process.terminate() finally: res = await process.wait() logger.debug('helper exited with code %i', res) async def _communicate(self, process: asyncio.subprocess.Process) -> None: assert process.stdin assert process.stdout messages: List[str] = [] async for line in process.stdout: logger.debug('Read line from helper: %s', line) command, _, value = line.strip().decode().partition(' ') # usually: PAM_PROMPT_ECHO_OFF Password: \n if command.startswith('PAM_PROMPT'): # Don't pass this to the UI if it's "Password" (the usual case), # so that superuser.py uses the translated default if value.startswith('Password'): value = '' # flush out accumulated info/error messages passwd = await self.responder.do_askpass('\n'.join(messages), value, '') messages.clear() if passwd is None: logger.debug('got PAM_PROMPT %s, but do_askpass returned None', value) raise asyncio.CancelledError('no password given') logger.debug('got PAM_PROMPT %s, do_askpass returned a password', value) process.stdin.write(passwd.encode()) process.stdin.write(b'\n') del passwd # don't keep this around longer than necessary await process.stdin.drain() logger.debug('got PAM_PROMPT, wrote password to helper') elif command in ('PAM_TEXT_INFO', 'PAM_ERROR'): messages.append(value) elif command == 'SUCCESS': logger.debug('Authentication succeeded') break elif command == 'FAILURE': logger.warning('Authentication failed') break else: logger.warning('Unknown line from helper, aborting: %s', line) process.terminate() break class PolkitAgent: """Register polkit agent when required Use this as a context manager to ensure that the agent gets unregistered again. """ def __init__(self, responder: AskpassHandler): self.responder = responder self.agent_slot = None async def __aenter__(self): try: self.system_bus = bus.Bus.default_system() except OSError as e: logger.warning('cannot connect to system bus, not registering polkit agent: %s', e) return self try: # may refine that with a D-Bus call to logind self.subject = ('unix-session', {'session-id': Variant(os.environ['XDG_SESSION_ID'], 's')}) except KeyError: logger.debug('XDG_SESSION_ID not set, not registering polkit agent') return self agent_object = org_freedesktop_PolicyKit1_AuthenticationAgent(self.responder) self.agent_slot = self.system_bus.add_object(AGENT_DBUS_PATH, agent_object) # register agent locale_name = locale.setlocale(locale.LC_MESSAGES, None) await self.system_bus.call_method_async( 'org.freedesktop.PolicyKit1', '/org/freedesktop/PolicyKit1/Authority', 'org.freedesktop.PolicyKit1.Authority', 'RegisterAuthenticationAgent', '(sa{sv})ss', self.subject, locale_name, AGENT_DBUS_PATH) logger.debug('Registered agent for %r and locale %s', self.subject, locale_name) return self async def __aexit__(self, _exc_type, _exc_value, _traceback): if self.agent_slot: await self.system_bus.call_method_async( 'org.freedesktop.PolicyKit1', '/org/freedesktop/PolicyKit1/Authority', 'org.freedesktop.PolicyKit1.Authority', 'UnregisterAuthenticationAgent', '(sa{sv})s', self.subject, AGENT_DBUS_PATH) self.agent_slot.cancel() logger.debug('Unregistered agent for %r', self.subject)
Simpan
Batal
Isi Zip:
Unzip
Create
Buat Folder
Buat File
Terminal / Execute
Run
Chmod Bulk
All File
All Folder
All File dan Folder
Apply