| import base64 |
| import io |
| import json |
| import os |
| import shutil |
| import time |
| from io import BytesIO |
| from queue import Queue |
|
|
| import paho.mqtt.client as mqtt |
| from PIL import Image |
|
|
|
|
| class MicroscopeDemo: |
| def __init__( |
| self, |
| host, |
| port, |
| username, |
| password, |
| microscope, |
| path_to_openflexure_stitching="OPTIONAL", |
| ): |
| self.host = host |
| self.port = port |
| self.username = username |
| self.password = password |
| self.microscope = microscope |
| self.path_to_openflexure_stitching = path_to_openflexure_stitching |
|
|
| self.client = mqtt.Client() |
| self.client.tls_set() |
| self.client.username_pw_set(self.username, self.password) |
|
|
| self.receiveq = Queue() |
|
|
| def on_message(client, userdata, message): |
| received = json.loads(message.payload.decode("utf-8")) |
| self.receiveq.put(received) |
| if len(json.dumps(received)) <= 300: |
| print(received) |
| else: |
| try: |
| print(json.dumps(received)[:300] + "...") |
| except Exception as e: |
| print(f"Command printing error (program will continue) {e}") |
|
|
| self.client.on_message = on_message |
|
|
| self.client.connect(self.host, port=self.port, keepalive=60, bind_address="") |
|
|
| self.client.loop_start() |
|
|
| self.client.subscribe(self.microscope + "/return", qos=2) |
|
|
| def scan_and_stitch( |
| self, c1, c2, temp, ov=1200, foc=0, output="Downloads/stitched.jpeg" |
| ): |
| """takes a scan with the same inputs + 2 more and outputs a stitched |
| image. output is the directory the stitched image will go to and temp is |
| the temporary directory to stitch the image otherwise it works just like |
| scan()""" |
| command = json.dumps( |
| {"command": "scan", "c1": c1, "c2": c2, "ov": ov, "foc": foc} |
| ) |
| self.client.publish( |
| self.microscope + "/command", payload=command, qos=2, retain=False |
| ) |
| while self.receiveq.empty(): |
| time.sleep(0.05) |
| image = self.receiveq.get() |
| image_list = image["images"] |
| if os.path.isdir(temp): |
| shutil.rmtree(temp) |
| os.makedirs(temp) |
| for i in range(len(image_list)): |
| image_bytes = base64.b64decode(image_list[i]) |
| with io.BytesIO(image_bytes) as buffer: |
| img = Image.open(buffer) |
| img.save( |
| temp + "/" + str(i) + ".jpeg", |
| format="JPEG", |
| exif=img.info.get("exif"), |
| ) |
| os.system( |
| "cd " |
| + self.path_to_openflexure_stitching |
| + " && python -m venv .venv && .\\.venv\\Scripts\\activate && openflexure-stitch " |
| + temp |
| ) |
| |
| |
|
|
| pos = len(temp) |
| for char in ("/", "\\"): |
| index = temp.rfind(char) |
| if index != -1 and index < pos: |
| pos = index |
| result = temp[pos + 1 :] if pos < len(temp) else temp |
|
|
| shutil.move(temp + "/" + result + "_stitched.jpg", output) |
|
|
| def move(self, x, y, z=False, relative=False): |
| """moves to given coordinates x, y (and z if it is set to any integer |
| value, if it is set to False the z value wont change). If relative is |
| True, then it will move relative to the current position instead of |
| moving to the absolute coordinates""" |
| command = json.dumps( |
| {"command": "move", "x": x, "y": y, "z": z, "relative": relative} |
| ) |
| self.client.publish( |
| self.microscope + "/command", payload=command, qos=2, retain=False |
| ) |
| while self.receiveq.empty(): |
| time.sleep(0.05) |
| return self.receiveq.get() |
|
|
| def scan(self, c1, c2, ov=1200, foc=0): |
| """returns a list of image objects. Takes images to scan an entire area |
| specified by two corners. you can input the corner coordinates as "x1 |
| y1", "x2, y2" or [x1, y1], [x2, y2]. ov refers to the overlap between |
| the images (useful for stitching) and foc refers to how much the |
| microscope should focus between images (0 to disable)""" |
| command = json.dumps( |
| {"command": "scan", "c1": c1, "c2": c2, "ov": ov, "foc": foc} |
| ) |
| self.client.publish( |
| self.microscope + "/command", payload=command, qos=2, retain=False |
| ) |
| while self.receiveq.empty(): |
| time.sleep(0.05) |
| image_l = self.receiveq.get() |
| image_list = image_l["images"] |
| for i in range(len(image_list)): |
| image = image_list[i] |
| image_bytes = base64.b64decode(image) |
| image_object = Image.open(BytesIO(image_bytes)) |
| image_list[i] = image_object |
| return image_list |
|
|
| def focus(self, amount="fast"): |
| """focuses by different amounts: huge, fast, medium, fine, or any |
| integer value""" |
| command = json.dumps({"command": "focus", "amount": amount}) |
| self.client.publish( |
| self.microscope + "/command", payload=command, qos=2, retain=False |
| ) |
| while self.receiveq.empty(): |
| time.sleep(0.05) |
| return self.receiveq.get() |
|
|
| def get_pos( |
| self, |
| ): |
| """returns a dictionary with x, y, and z coordinates eg. |
| {'x':1,'y':2,'z':3}""" |
| command = json.dumps({"command": "get_pos"}) |
| self.client.publish( |
| self.microscope + "/command", payload=command, qos=2, retain=False |
| ) |
| while self.receiveq.empty(): |
| time.sleep(0.05) |
| pos = self.receiveq.get() |
| return pos["pos"] |
|
|
| def take_image(self): |
| """returns an image object""" |
| command = json.dumps({"command": "take_image"}) |
| self.client.publish( |
| self.microscope + "/command", payload=command, qos=2, retain=False |
| ) |
| while self.receiveq.empty(): |
| time.sleep(0.05) |
| image = self.receiveq.get() |
| image_string = image["image"] |
| image_bytes = base64.b64decode(image_string) |
| image_object = Image.open(BytesIO(image_bytes)) |
| return image_object |
|
|
| def end_connection(self): |
| self.client.loop_stop() |
| self.client.disconnect() |
|
|