Skip to content

DaMiaoMotor

damiao_motor.motor.DaMiaoMotor

Lightweight DaMiao motor wrapper over a CAN bus.

Source code in damiao_motor/motor.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
class DaMiaoMotor:
    """
    Lightweight DaMiao motor wrapper over a CAN bus.
    """

    def __init__(
        self,
        motor_id: int,
        feedback_id: int,
        bus: can.Bus,
        *,
        motor_type: str,
        p_min: Optional[float] = None,
        p_max: Optional[float] = None,
        v_min: Optional[float] = None,
        v_max: Optional[float] = None,
        t_min: Optional[float] = None,
        t_max: Optional[float] = None,
    ) -> None:
        self.motor_id = motor_id
        self.feedback_id = feedback_id
        self.bus = bus

        # Resolve P/V/T limits from motor_type preset + optional overrides. kp and kd use fixed KP_MIN/KP_MAX, KD_MIN/KD_MAX.
        base = self._resolve_limits(motor_type)
        overrides = {
            k: v
            for k, v in (
                ("p_min", p_min), ("p_max", p_max),
                ("v_min", v_min), ("v_max", v_max),
                ("t_min", t_min), ("t_max", t_max),
            )
            if v is not None
        }
        base.update(overrides)
        for k in _LIMITS_KEYS:
            setattr(self, f"_{k}", base[k])

        self.motor_type = motor_type

        # last decoded feedback
        self.state: Dict[str, Any] = {}
        self.state_lock = threading.Lock()

        # last register values
        self.registers: Dict[int, float | int] = {}
        self.registers_lock = threading.Lock()
        self.register_request_time: Dict[int, float] = {}
        self.register_request_time_lock = threading.Lock()
        self.register_reply_time: Dict[int, float] = {}
        self.register_reply_time_lock = threading.Lock()

    def _resolve_limits(self, motor_type: str) -> Dict[str, float]:
        """Resolve limits from motor_type preset. Returns a dict of the 6 P/V/T limit values."""
        if motor_type not in MOTOR_TYPE_PRESETS:
            raise ValueError(
                f"Unknown motor_type: {motor_type!r}. "
                f"Known: {list(MOTOR_TYPE_PRESETS.keys())}"
            )
        return dict(MOTOR_TYPE_PRESETS[motor_type])

    def set_motor_type(self, motor_type: str) -> None:
        """Update motor type and P/V/T limits from a preset. Validates against MOTOR_TYPE_PRESETS."""
        if motor_type not in MOTOR_TYPE_PRESETS:
            raise ValueError(
                f"Unknown motor_type: {motor_type!r}. "
                f"Known: {list(MOTOR_TYPE_PRESETS.keys())}"
            )
        base = dict(MOTOR_TYPE_PRESETS[motor_type])
        for k in _LIMITS_KEYS:
            setattr(self, f"_{k}", base[k])
        self.motor_type = motor_type

    def get_states(self) -> Dict[str, Any]:
        """
        Get the current motor state dictionary.

        Returns:
            Dictionary containing current motor state information:
            - can_id: CAN ID
            - status: Human-readable status name
            - status_code: Status code
            - pos: Position
            - vel: Velocity
            - torq: Torque
            - t_mos: MOSFET temperature
            - t_rotor: Rotor temperature
            - arbitration_id: CAN arbitration ID
        """
        return self.state.copy() if self.state else {}

    # -----------------------
    # Encode messages
    # -----------------------
    def encode_cmd_msg(self, pos: float, vel: float, torq: float, kp: float, kd: float) -> bytes:
        """
        Encode a command to CAN frame for sending to the motor.
        Uses this motor's P/V/T limits (from motor_type preset) and fixed kp (KP_MIN/KP_MAX), kd (KD_MIN/KD_MAX).
        """
        pos_u = float_to_uint(pos, self._p_min, self._p_max, 16)
        vel_u = float_to_uint(vel, self._v_min, self._v_max, 12)
        kp_u = float_to_uint(kp, KP_MIN, KP_MAX, 12)
        kd_u = float_to_uint(kd, KD_MIN, KD_MAX, 12)
        torq_u = float_to_uint(torq, self._t_min, self._t_max, 12)

        data = [
            (pos_u >> 8) & 0xFF,
            pos_u & 0xFF,
            (vel_u >> 4) & 0xFF,
            ((vel_u & 0xF) << 4) | ((kp_u >> 8) & 0xF),
            kp_u & 0xFF,
            (kd_u >> 4) & 0xFF,
            ((kd_u & 0xF) << 4) | ((torq_u >> 8) & 0xF),
            torq_u & 0xFF,
        ]
        return bytes(data)

    @staticmethod
    def encode_enable_msg() -> bytes:
        return bytes([0xFF] * 7 + [0xFC])

    @staticmethod
    def encode_disable_msg() -> bytes:
        return bytes([0xFF] * 7 + [0xFD])

    @staticmethod
    def encode_save_position_zero_msg() -> bytes:
        """Encode save position zero command (sets current position to zero)."""
        return bytes([0xFF] * 7 + [0xFE])

    @staticmethod
    def encode_clear_error_msg() -> bytes:
        """Encode clear error command (clears motor errors like overheating)."""
        return bytes([0xFF] * 7 + [0xFB])

    # -----------------------
    # Sending CAN frames
    # -----------------------
    def send_raw(self, data: bytes, arbitration_id: int | None = None) -> None:
        """
        Send raw CAN message.

        Args:
            data: CAN message data bytes (must be 8 bytes)
            arbitration_id: CAN arbitration ID (defaults to motor_id if not specified)

        Raises:
            ValueError: If data is not 8 bytes or arbitration_id is invalid
            OSError: If CAN bus error occurs (e.g., Error Code 105 - No buffer space)
            can.CanError: If CAN-specific error occurs
            AttributeError: If bus is not initialized
        """
        if len(data) != 8:
            raise ValueError(f"CAN message data must be 8 bytes, got {len(data)} bytes")

        if arbitration_id is None:
            arbitration_id = self.motor_id

        if arbitration_id < 0 or arbitration_id > 0x7FF:
            raise ValueError(f"Invalid arbitration_id: {arbitration_id}. Must be in range 0-0x7FF")

        try:
            msg = can.Message(arbitration_id=arbitration_id, data=data, is_extended_id=False)
            self.bus.send(msg)
        except OSError as e:
            error_str = str(e)
            errno = getattr(e, 'errno', None)

            # Error Code 105: No buffer space available
            if errno == 105 or "Error Code 105" in error_str or "No buffer space available" in error_str or "[Errno 105]" in error_str:
                raise OSError(
                    f"CAN bus buffer full (Error Code 105) when sending to arbitration_id 0x{arbitration_id:03X}. "
                    f"This typically indicates: no motor connected, motor not powered, or CAN hardware issue. "
                    f"Original error: {e}"
                ) from e
            # Other OSError cases
            raise OSError(f"CAN bus system error when sending to arbitration_id 0x{arbitration_id:03X}: {e}") from e
        except can.CanError as e:
            raise can.CanError(f"CAN bus error when sending to arbitration_id 0x{arbitration_id:03X}: {e}") from e
        except AttributeError as e:
            if "bus" in str(e).lower() or "send" in str(e).lower():
                raise AttributeError(f"CAN bus not initialized. Bus may be closed or not connected.") from e
            raise
        except Exception as e:
            raise RuntimeError(f"Unexpected error sending CAN message to arbitration_id 0x{arbitration_id:03X}: {e}") from e

    def enable(self) -> None:
        self.send_raw(self.encode_enable_msg())

    def disable(self) -> None:
        self.send_raw(self.encode_disable_msg())

    def set_zero_position(self) -> None:
        """Set the current output shaft position to zero."""
        self.send_raw(self.encode_save_position_zero_msg())

    def set_zero_command(self) -> None:
        """Send zero command (pos=0, vel=0, torq=0, kp=0, kd=0)."""
        self.send_cmd(target_position=0.0, target_velocity=0.0, stiffness=0.0, damping=0.0, feedforward_torque=0.0)

    def ensure_control_mode(self, control_mode: str) -> None:
        """
        Ensure control mode (register 10) matches the desired mode.
        Reads register 10; if it differs, writes and verifies.

        Args:
            control_mode: "MIT", "POS_VEL", "VEL", or "FORCE_POS"

        Raises:
            ValueError: If control_mode is invalid or register value is invalid
            TimeoutError: If reading/writing register times out
            RuntimeError: Other errors during register operations, or if verification after write fails
        """
        mode_to_register = {"MIT": 1, "POS_VEL": 2, "VEL": 3, "FORCE_POS": 4}
        if control_mode not in mode_to_register:
            raise ValueError(f"Invalid control_mode: {control_mode}. Must be one of {list(mode_to_register.keys())}")
        desired = mode_to_register[control_mode]

        try:
            current = int(self.get_register(10, timeout=1.0))
            if current == desired:
                return
            print(f"⚠ Control mode mismatch: register 10 = {current}, required = {desired}")
            print(f"  Setting control mode to {control_mode} (register value: {desired})...")
            self.write_register(10, desired)
            time.sleep(0.1)
            verify = int(self.get_register(10, timeout=1.0))
            if verify != desired:
                raise RuntimeError(
                    f"Control mode verification failed after write: expected {desired}, got {verify}"
                )
            print(f"✓ Control mode set to {control_mode}")
        except TimeoutError as e:
            raise TimeoutError(f"Timeout while checking/setting control mode (register 10): {e}") from e
        except ValueError as e:
            raise ValueError(f"Invalid control mode value in register 10: {e}") from e
        except RuntimeError:
            raise  # verification failure, preserve message
        except Exception as e:
            raise RuntimeError(f"Error checking/setting control mode: {e}") from e

    def set_can_timeout(self, timeout_ms: int) -> None:
        """
        Set CAN timeout alarm time (register 9).

        Args:
            timeout_ms: Timeout in milliseconds

        Note:
            Register 9 stores timeout in units of 50 microseconds: **1 register unit = 50 microseconds**.

            Conversion formula: register_value = timeout_ms × 20

            Examples:
            - 1000 ms = 1,000,000 microseconds = 20,000 register units
            - 50 ms = 50,000 microseconds = 1,000 register units
        """
        # Convert milliseconds to register units: 1 register unit = 50 microseconds
        # timeout_ms * 1000 us/ms / 50 us/unit = timeout_ms * 20
        register_value = timeout_ms * 20
        self.write_register(9, register_value)

    def clear_error(self) -> None:
        """Clear motor errors (e.g., overheating)."""
        self.send_raw(self.encode_clear_error_msg())

    def send_cmd(
        self,
        target_position: float = 0.0,
        target_velocity: float = 0.0,
        stiffness: float = 0.0,
        damping: float = 0.0,
        feedforward_torque: float = 0.0,
        control_mode: str = "MIT",
        velocity_limit: float = 0.0,
        current_limit: float = 0.0,
    ) -> None:
        """
        Send command to motor with specified control mode.

        Args:
            target_position: Desired position (radians)
            target_velocity: Desired velocity (rad/s)
            stiffness: Stiffness (kp) for MIT mode
            damping: Damping (kd) for MIT mode
            feedforward_torque: Feedforward torque for MIT mode
            control_mode: Control mode - "MIT" (default), "POS_VEL", "VEL", or "FORCE_POS"
            velocity_limit: Velocity limit (rad/s, 0-100) for FORCE_POS mode
            current_limit: Current limit normalized (0.0-1.0) for FORCE_POS mode

        Note:
            Before using this method to send commands, ensure that the motor's control mode register (register 10)
            is set to match the desired control_mode argument ("MIT", "POS_VEL", "VEL", or "FORCE_POS").
            If the register does not match, the motor will not respond to commands and will not move.
        """
        # Check if motor is disabled and enable it if necessary
        if self.state and self.state.get("status_code") == DM_MOTOR_DISABLED:
            self.enable()

        # Check if motor has lost communication and clear error if necessary
        if self.state and self.state.get("status_code") == DM_MOTOR_LOST_COMM:
            self.clear_error()

        if control_mode == "MIT":
            # MIT-style control mode (default)
            data = self.encode_cmd_msg(target_position, target_velocity, feedforward_torque, stiffness, damping)
            self.send_raw(data)
        elif control_mode == "POS_VEL":
            # POS_VEL Mode: CAN ID 0x100 + motor_id
            data = struct.pack('<ff', target_position, target_velocity)
            arbitration_id = 0x100 + self.motor_id
            self.send_raw(data, arbitration_id=arbitration_id)
        elif control_mode == "VEL":
            # VEL Mode: CAN ID 0x200 + motor_id
            data = struct.pack('<f', target_velocity) + b'\x00' * 4
            arbitration_id = 0x200 + self.motor_id
            self.send_raw(data, arbitration_id=arbitration_id)
        elif control_mode == "FORCE_POS":
            # FORCE_POS Mode: CAN ID 0x300 + motor_id
            # Clamp and scale velocity limit (0-100 rad/s -> 0-10000)
            v_des_clamped = max(0.0, min(100.0, velocity_limit))
            v_des_scaled = int(v_des_clamped * 100)
            v_des_scaled = min(10000, v_des_scaled)

            # Clamp and scale current limit (0.0-1.0 -> 0-10000)
            i_des_clamped = max(0.0, min(1.0, current_limit))
            i_des_scaled = int(i_des_clamped * 10000)
            i_des_scaled = min(10000, i_des_scaled)

            # Pack: float (4 bytes) + uint16 (2 bytes) + uint16 (2 bytes)
            data = struct.pack('<fHH', target_position, v_des_scaled, i_des_scaled)
            arbitration_id = 0x300 + self.motor_id
            self.send_raw(data, arbitration_id=arbitration_id)
        else:
            raise ValueError(f"Unknown control_mode: {control_mode}. Must be 'MIT', 'POS_VEL', 'VEL', or 'FORCE_POS'")

    # -----------------------
    # Decode feedback
    # -----------------------
    def process_feedback_frame(self, data: bytes, arbitration_id: int | None = None) -> None:
        if is_register_reply(data):
            self.decode_register_reply(data, arbitration_id=arbitration_id)

        else:
            self.decode_sensor_feedback(data, arbitration_id=arbitration_id)

    def decode_register_reply(self, data: bytes, arbitration_id: int | None = None) -> None:
        with self.register_reply_time_lock:
            # record the time of the register reply
            self.register_reply_time[data[3]] = time.perf_counter()
        with self.registers_lock:
            # unpack with corresponding data type
            if REGISTER_TABLE[data[3]].data_type == "float":
                self.registers[data[3]] = struct.unpack("<f", data[4:8])[0]
            elif REGISTER_TABLE[data[3]].data_type == "uint32":
                self.registers[data[3]] = struct.unpack("<I", data[4:8])[0]
            else:
                raise ValueError(f"Unknown data_type: {REGISTER_TABLE[data[3]].data_type} for register {data[3]}")
            return

    def decode_sensor_feedback(self, data: bytes, arbitration_id: int | None = None) -> Dict[str, float]:
        if len(data) != 8:
            raise ValueError("Feedback frame must have length 8")

        can_id = data[0] & 0x0F
        status = data[0] >> 4
        pos_int = (data[1] << 8) | data[2]
        vel_int = (data[3] << 4) | (data[4] >> 4)
        torq_int = ((data[4] & 0xF) << 8) | data[5]
        t_mos = float(data[6])
        t_rotor = float(data[7])

        decoded = {
            "can_id": can_id,
            "arbitration_id": arbitration_id,
            "status": decode_state_name(status),
            "status_code": status,
            "pos": uint_to_float(pos_int, self._p_min, self._p_max, 16),
            "vel": uint_to_float(vel_int, self._v_min, self._v_max, 12),
            "torq": uint_to_float(torq_int, self._t_min, self._t_max, 12),
            "t_mos": t_mos,
            "t_rotor": t_rotor,
        }
        self.state = decoded
        return decoded

    # -----------------------
    # Register read/write operations
    # -----------------------
    def _encode_can_id(self, can_id: int) -> tuple[int, int]:
        """Encode CAN ID into low and high bytes."""
        return can_id & 0xFF, (can_id >> 8) & 0xFF

    def _send_register_cmd(self, cmd_byte: int, rid: int, data: Optional[bytes] = None) -> None:
        """
        Send a register command (read/write/store).

        Args:
            cmd_byte: Command byte (0x33 for read, 0x55 for write, 0xAA for store)
            rid: Register ID (0-81)
            data: Optional 4-byte data for write operations
        """
        canid_l, canid_h = self._encode_can_id(self.motor_id)

        if data is None:
            # Read or store command - D[4-7] are don't care
            msg_data = bytes([canid_l, canid_h, cmd_byte, rid, 0x00, 0x00, 0x00, 0x00])
        else:
            # Write command - D[4-7] contain the data
            if len(data) != 4:
                raise ValueError("Data must be 4 bytes for write operations")
            msg_data = bytes([canid_l, canid_h, cmd_byte, rid]) + data

        self.send_raw(msg_data, arbitration_id=0x7FF)

    def request_register_reading(self, rid: int) -> None:
        """
        Request a register reading from the motor.
        """
        with self.register_request_time_lock:
            self.register_request_time[rid] = time.perf_counter()
        self._send_register_cmd(0x33, rid)

    def get_register(self, rid: int, timeout: float = 1.0) -> float | int:
        """
        Read a register value from the motor.

        If the value is not already cached, sends a read request and waits for the
        controller's background polling to receive the reply. The motor never
        reads from the bus; only the controller's poll_feedback does, avoiding
        multiple consumers.

        Requires the motor to be managed by a DaMiaoController (added via
        controller.add_motor). Standalone motors can only return cached values.

        Args:
            rid: Register ID (0-81)
            timeout: Timeout in seconds to wait for response

        Returns:
            Register value as float or int depending on register data type

        Raises:
            KeyError: If register ID is not in the register table
            RuntimeError: If the motor is not managed by a controller (no background polling)
            TimeoutError: If the register reply was not received within timeout
        """
        if rid not in REGISTER_TABLE:
            raise KeyError(f"Register {rid} not found in register table")
        with self.registers_lock:
            if rid in self.registers:
                return self.registers[rid]
        if getattr(self, "_controller", None) is None:
            raise RuntimeError(
                "get_register requires the motor to be managed by a DaMiaoController "
                "(added via controller.add_motor). The controller's background polling "
                "is the only bus reader; standalone motors cannot block-wait for register replies."
            )
        self.request_register_reading(rid)
        deadline = time.time() + timeout
        while True:
            with self.registers_lock:
                if rid in self.registers:
                    return self.registers[rid]
            if time.time() >= deadline:
                raise TimeoutError(f"Register {rid} not received within {timeout}s")
            time.sleep(0.01)

    def write_register(self, rid: int, value: float | int) -> None:
        """
        Write a value to a register.

        Args:
            rid: Register ID (0-81)
            value: Value to write (float or int)

        Raises:
            KeyError: If register ID is not in the register table
            ValueError: If register is read-only or value is out of range
        """
        # Check if register exists in table
        if rid not in REGISTER_TABLE:
            raise KeyError(f"Register {rid} not found in register table")

        reg_info = REGISTER_TABLE[rid]

        # Check if register is writable
        if reg_info.access != "RW":
            raise ValueError(f"Register {rid} ({reg_info.variable}) is read-only (access: {reg_info.access})")

        # Encode value to 4 bytes using data type from register table
        if reg_info.data_type == "float":
            data_bytes = struct.pack("<f", float(value))
        elif reg_info.data_type == "uint32":
            data_bytes = struct.pack("<I", int(value))
        else:
            raise ValueError(f"Unknown data_type: {reg_info.data_type} for register {rid}")

        # Send write command
        self._send_register_cmd(0x55, rid, data_bytes)

        # Wait for echo response (optional - motor echoes back the written data)
        # Note: You may want to verify the echo matches, but for now we just send

    def get_register_info(self, rid: int) -> RegisterInfo:
        """
        Get information about a register.

        Args:
            rid: Register ID

        Returns:
            RegisterInfo object with register details

        Raises:
            KeyError: If register ID is not in the register table
        """
        if rid not in REGISTER_TABLE:
            raise KeyError(f"Register {rid} not found in register table")
        return REGISTER_TABLE[rid]

    def store_parameters(self) -> None:
        """
        Store all parameters to flash memory.
        After successful write, all parameters will be written to the chip.
        """
        self._send_register_cmd(0xAA, 0x01)

    def request_motor_feedback(self) -> None:
        """
        Request motor feedback/status information.
        After successful transmission, the motor driver will return current status information.
        """
        canid_l, canid_h = self._encode_can_id(self.motor_id)
        msg_data = bytes([canid_l, canid_h, 0xCC, 0x00, 0x00, 0x00, 0x00, 0x00])
        self.send_raw(msg_data, arbitration_id=0x7FF)

    def read_all_registers(self, timeout: float = 0.05) -> Dict[int, float | int]:
        """
        Read all registers from the motor.

        Args:
            timeout: Timeout in seconds per register read

        Returns:
            Dictionary mapping register ID to value
        """
        for rid, reg_info in REGISTER_TABLE.items():
            if reg_info.access in ["RO", "RW"]:  # Readable registers
                self.request_register_reading(rid)
                time.sleep(0.0005)
        results: Dict[int, float | int] = {}
        time.sleep(0.01)  # wait for the replies
        for rid, reg_info in REGISTER_TABLE.items():
            if reg_info.access in ["RO", "RW"]:  # Readable registers
                try:
                    results[rid] = self.get_register(rid, timeout=timeout)
                except (TimeoutError, KeyError, ValueError, RuntimeError) as e:
                    # Store error as string for debugging
                    results[rid] = f"ERROR: {e}"
        return results

    # -----------------------
    # Limit setters and mapping helpers
    # -----------------------
    def set_p_limits(self, p_min: float, p_max: float) -> None:
        """Set position limits used for encode/decode (MIT mode)."""
        self._p_min, self._p_max = p_min, p_max

    def set_v_limits(self, v_min: float, v_max: float) -> None:
        """Set velocity limits used for encode/decode (MIT mode)."""
        self._v_min, self._v_max = v_min, v_max

    def set_t_limits(self, t_min: float, t_max: float) -> None:
        """Set torque limits used for encode/decode (MIT mode)."""
        self._t_min, self._t_max = t_min, t_max

    def set_limits(
        self,
        *,
        p_min: Optional[float] = None,
        p_max: Optional[float] = None,
        v_min: Optional[float] = None,
        v_max: Optional[float] = None,
        t_min: Optional[float] = None,
        t_max: Optional[float] = None,
    ) -> None:
        """Update only the specified P/V/T limits. Omitted keys are left unchanged. kp and kd are fixed (KP_MIN/KP_MAX, KD_MIN/KD_MAX)."""
        if p_min is not None:
            self._p_min = p_min
        if p_max is not None:
            self._p_max = p_max
        if v_min is not None:
            self._v_min = v_min
        if v_max is not None:
            self._v_max = v_max
        if t_min is not None:
            self._t_min = t_min
        if t_max is not None:
            self._t_max = t_max

    # -----------------------
    # Setter methods for all writable registers
    # -----------------------
    def set_under_voltage_protection(self, value: float) -> None:
        """Set under-voltage protection value (register 0)."""
        self.write_register(0, value)

    def set_torque_coefficient(self, value: float) -> None:
        """Set torque coefficient (register 1)."""
        self.write_register(1, value)

    def set_over_temperature_protection(self, value: float) -> None:
        """Set over-temperature protection value (register 2)."""
        self.write_register(2, value)

    def set_over_current_protection(self, value: float) -> None:
        """Set over-current protection value (register 3)."""
        self.write_register(3, value)

    def set_acceleration(self, value: float) -> None:
        """Set acceleration (register 4)."""
        self.write_register(4, value)

    def set_deceleration(self, value: float) -> None:
        """Set deceleration (register 5)."""
        self.write_register(5, value)

    def set_maximum_speed(self, value: float) -> None:
        """Set maximum speed (register 6)."""
        self.write_register(6, value)

    def set_feedback_id(self, value: int) -> None:
        """Set feedback ID (register 7)."""
        self.write_register(7, value)

    def set_receive_id(self, value: int) -> None:
        """Set receive ID (register 8)."""
        self.write_register(8, value)

    def set_timeout_alarm(self, value: int) -> None:
        """
        Set timeout alarm time (register 9).

        Args:
            value: Timeout value in register units (1 unit = 50 microseconds)

        Note:
            This method writes the raw register value. For convenience, use `set_can_timeout()`
            which accepts milliseconds and handles the conversion automatically.

            Conversion: 1 register unit = 50 microseconds
            To convert from milliseconds: register_value = timeout_ms × 20
        """
        self.write_register(9, value)

    def set_control_mode(self, value: int) -> None:
        """Set control mode (register 10)."""
        self.write_register(10, value)

    def set_position_mapping_range(self, value: float) -> None:
        """Set position mapping range (register 21)."""
        self.write_register(21, value)

    def set_speed_mapping_range(self, value: float) -> None:
        """Set speed mapping range (register 22)."""
        self.write_register(22, value)

    def set_torque_mapping_range(self, value: float) -> None:
        """Set torque mapping range (register 23)."""
        self.write_register(23, value)

    def set_current_loop_bandwidth(self, value: float) -> None:
        """Set current loop control bandwidth (register 24)."""
        self.write_register(24, value)

    def set_speed_loop_kp(self, value: float) -> None:
        """Set speed loop proportional gain Kp (register 25)."""
        self.write_register(25, value)

    def set_speed_loop_ki(self, value: float) -> None:
        """Set speed loop integral gain Ki (register 26)."""
        self.write_register(26, value)

    def set_position_loop_kp(self, value: float) -> None:
        """Set position loop proportional gain Kp (register 27)."""
        self.write_register(27, value)

    def set_position_loop_ki(self, value: float) -> None:
        """Set position loop integral gain Ki (register 28)."""
        self.write_register(28, value)

    def set_overvoltage_protection(self, value: float) -> None:
        """Set overvoltage protection value (register 29)."""
        self.write_register(29, value)

    def set_gear_efficiency(self, value: float) -> None:
        """Set gear torque efficiency (register 30)."""
        self.write_register(30, value)

    def set_speed_loop_damping(self, value: float) -> None:
        """Set speed loop damping coefficient (register 31)."""
        self.write_register(31, value)

    def set_speed_loop_filter_bandwidth(self, value: float) -> None:
        """Set speed loop filter bandwidth (register 32)."""
        self.write_register(32, value)

    def set_current_loop_enhancement(self, value: float) -> None:
        """Set current loop enhancement coefficient (register 33)."""
        self.write_register(33, value)

    def set_speed_loop_enhancement(self, value: float) -> None:
        """Set speed loop enhancement coefficient (register 34)."""
        self.write_register(34, value)

    def set_can_baud_rate(self, baud_rate_code: int) -> None:
        """
        Set CAN baud rate using register 35 (can_br).

        Args:
            baud_rate_code: Baud rate code (0=125K, 1=200K, 2=250K, 3=500K, 4=1M)

        Raises:
            ValueError: If baud_rate_code is not in valid range [0, 4]
        """
        if baud_rate_code not in CAN_BAUD_RATE_CODES:
            raise ValueError(f"Invalid baud rate code: {baud_rate_code}. Must be in {list(CAN_BAUD_RATE_CODES.keys())}")

        self.write_register(35, baud_rate_code)  # Register 35 is can_br
        self.store_parameters()  # Store to flash so it persists

clear_error

clear_error() -> None

Clear motor errors (e.g., overheating).

Source code in damiao_motor/motor.py
def clear_error(self) -> None:
    """Clear motor errors (e.g., overheating)."""
    self.send_raw(self.encode_clear_error_msg())

encode_clear_error_msg staticmethod

encode_clear_error_msg() -> bytes

Encode clear error command (clears motor errors like overheating).

Source code in damiao_motor/motor.py
@staticmethod
def encode_clear_error_msg() -> bytes:
    """Encode clear error command (clears motor errors like overheating)."""
    return bytes([0xFF] * 7 + [0xFB])

encode_cmd_msg

encode_cmd_msg(pos: float, vel: float, torq: float, kp: float, kd: float) -> bytes

Encode a command to CAN frame for sending to the motor. Uses this motor's P/V/T limits (from motor_type preset) and fixed kp (KP_MIN/KP_MAX), kd (KD_MIN/KD_MAX).

Source code in damiao_motor/motor.py
def encode_cmd_msg(self, pos: float, vel: float, torq: float, kp: float, kd: float) -> bytes:
    """
    Encode a command to CAN frame for sending to the motor.
    Uses this motor's P/V/T limits (from motor_type preset) and fixed kp (KP_MIN/KP_MAX), kd (KD_MIN/KD_MAX).
    """
    pos_u = float_to_uint(pos, self._p_min, self._p_max, 16)
    vel_u = float_to_uint(vel, self._v_min, self._v_max, 12)
    kp_u = float_to_uint(kp, KP_MIN, KP_MAX, 12)
    kd_u = float_to_uint(kd, KD_MIN, KD_MAX, 12)
    torq_u = float_to_uint(torq, self._t_min, self._t_max, 12)

    data = [
        (pos_u >> 8) & 0xFF,
        pos_u & 0xFF,
        (vel_u >> 4) & 0xFF,
        ((vel_u & 0xF) << 4) | ((kp_u >> 8) & 0xF),
        kp_u & 0xFF,
        (kd_u >> 4) & 0xFF,
        ((kd_u & 0xF) << 4) | ((torq_u >> 8) & 0xF),
        torq_u & 0xFF,
    ]
    return bytes(data)

encode_save_position_zero_msg staticmethod

encode_save_position_zero_msg() -> bytes

Encode save position zero command (sets current position to zero).

Source code in damiao_motor/motor.py
@staticmethod
def encode_save_position_zero_msg() -> bytes:
    """Encode save position zero command (sets current position to zero)."""
    return bytes([0xFF] * 7 + [0xFE])

ensure_control_mode

ensure_control_mode(control_mode: str) -> None

Ensure control mode (register 10) matches the desired mode. Reads register 10; if it differs, writes and verifies.

Parameters:

Name Type Description Default
control_mode str

"MIT", "POS_VEL", "VEL", or "FORCE_POS"

required

Raises:

Type Description
ValueError

If control_mode is invalid or register value is invalid

TimeoutError

If reading/writing register times out

RuntimeError

Other errors during register operations, or if verification after write fails

Source code in damiao_motor/motor.py
def ensure_control_mode(self, control_mode: str) -> None:
    """
    Ensure control mode (register 10) matches the desired mode.
    Reads register 10; if it differs, writes and verifies.

    Args:
        control_mode: "MIT", "POS_VEL", "VEL", or "FORCE_POS"

    Raises:
        ValueError: If control_mode is invalid or register value is invalid
        TimeoutError: If reading/writing register times out
        RuntimeError: Other errors during register operations, or if verification after write fails
    """
    mode_to_register = {"MIT": 1, "POS_VEL": 2, "VEL": 3, "FORCE_POS": 4}
    if control_mode not in mode_to_register:
        raise ValueError(f"Invalid control_mode: {control_mode}. Must be one of {list(mode_to_register.keys())}")
    desired = mode_to_register[control_mode]

    try:
        current = int(self.get_register(10, timeout=1.0))
        if current == desired:
            return
        print(f"⚠ Control mode mismatch: register 10 = {current}, required = {desired}")
        print(f"  Setting control mode to {control_mode} (register value: {desired})...")
        self.write_register(10, desired)
        time.sleep(0.1)
        verify = int(self.get_register(10, timeout=1.0))
        if verify != desired:
            raise RuntimeError(
                f"Control mode verification failed after write: expected {desired}, got {verify}"
            )
        print(f"✓ Control mode set to {control_mode}")
    except TimeoutError as e:
        raise TimeoutError(f"Timeout while checking/setting control mode (register 10): {e}") from e
    except ValueError as e:
        raise ValueError(f"Invalid control mode value in register 10: {e}") from e
    except RuntimeError:
        raise  # verification failure, preserve message
    except Exception as e:
        raise RuntimeError(f"Error checking/setting control mode: {e}") from e

get_register

get_register(rid: int, timeout: float = 1.0) -> float | int

Read a register value from the motor.

If the value is not already cached, sends a read request and waits for the controller's background polling to receive the reply. The motor never reads from the bus; only the controller's poll_feedback does, avoiding multiple consumers.

Requires the motor to be managed by a DaMiaoController (added via controller.add_motor). Standalone motors can only return cached values.

Parameters:

Name Type Description Default
rid int

Register ID (0-81)

required
timeout float

Timeout in seconds to wait for response

1.0

Returns:

Type Description
float | int

Register value as float or int depending on register data type

Raises:

Type Description
KeyError

If register ID is not in the register table

RuntimeError

If the motor is not managed by a controller (no background polling)

TimeoutError

If the register reply was not received within timeout

Source code in damiao_motor/motor.py
def get_register(self, rid: int, timeout: float = 1.0) -> float | int:
    """
    Read a register value from the motor.

    If the value is not already cached, sends a read request and waits for the
    controller's background polling to receive the reply. The motor never
    reads from the bus; only the controller's poll_feedback does, avoiding
    multiple consumers.

    Requires the motor to be managed by a DaMiaoController (added via
    controller.add_motor). Standalone motors can only return cached values.

    Args:
        rid: Register ID (0-81)
        timeout: Timeout in seconds to wait for response

    Returns:
        Register value as float or int depending on register data type

    Raises:
        KeyError: If register ID is not in the register table
        RuntimeError: If the motor is not managed by a controller (no background polling)
        TimeoutError: If the register reply was not received within timeout
    """
    if rid not in REGISTER_TABLE:
        raise KeyError(f"Register {rid} not found in register table")
    with self.registers_lock:
        if rid in self.registers:
            return self.registers[rid]
    if getattr(self, "_controller", None) is None:
        raise RuntimeError(
            "get_register requires the motor to be managed by a DaMiaoController "
            "(added via controller.add_motor). The controller's background polling "
            "is the only bus reader; standalone motors cannot block-wait for register replies."
        )
    self.request_register_reading(rid)
    deadline = time.time() + timeout
    while True:
        with self.registers_lock:
            if rid in self.registers:
                return self.registers[rid]
        if time.time() >= deadline:
            raise TimeoutError(f"Register {rid} not received within {timeout}s")
        time.sleep(0.01)

get_register_info

get_register_info(rid: int) -> RegisterInfo

Get information about a register.

Parameters:

Name Type Description Default
rid int

Register ID

required

Returns:

Type Description
RegisterInfo

RegisterInfo object with register details

Raises:

Type Description
KeyError

If register ID is not in the register table

Source code in damiao_motor/motor.py
def get_register_info(self, rid: int) -> RegisterInfo:
    """
    Get information about a register.

    Args:
        rid: Register ID

    Returns:
        RegisterInfo object with register details

    Raises:
        KeyError: If register ID is not in the register table
    """
    if rid not in REGISTER_TABLE:
        raise KeyError(f"Register {rid} not found in register table")
    return REGISTER_TABLE[rid]

get_states

get_states() -> Dict[str, Any]

Get the current motor state dictionary.

Returns:

Type Description
Dict[str, Any]

Dictionary containing current motor state information:

Dict[str, Any]
  • can_id: CAN ID
Dict[str, Any]
  • status: Human-readable status name
Dict[str, Any]
  • status_code: Status code
Dict[str, Any]
  • pos: Position
Dict[str, Any]
  • vel: Velocity
Dict[str, Any]
  • torq: Torque
Dict[str, Any]
  • t_mos: MOSFET temperature
Dict[str, Any]
  • t_rotor: Rotor temperature
Dict[str, Any]
  • arbitration_id: CAN arbitration ID
Source code in damiao_motor/motor.py
def get_states(self) -> Dict[str, Any]:
    """
    Get the current motor state dictionary.

    Returns:
        Dictionary containing current motor state information:
        - can_id: CAN ID
        - status: Human-readable status name
        - status_code: Status code
        - pos: Position
        - vel: Velocity
        - torq: Torque
        - t_mos: MOSFET temperature
        - t_rotor: Rotor temperature
        - arbitration_id: CAN arbitration ID
    """
    return self.state.copy() if self.state else {}

read_all_registers

read_all_registers(timeout: float = 0.05) -> Dict[int, float | int]

Read all registers from the motor.

Parameters:

Name Type Description Default
timeout float

Timeout in seconds per register read

0.05

Returns:

Type Description
Dict[int, float | int]

Dictionary mapping register ID to value

Source code in damiao_motor/motor.py
def read_all_registers(self, timeout: float = 0.05) -> Dict[int, float | int]:
    """
    Read all registers from the motor.

    Args:
        timeout: Timeout in seconds per register read

    Returns:
        Dictionary mapping register ID to value
    """
    for rid, reg_info in REGISTER_TABLE.items():
        if reg_info.access in ["RO", "RW"]:  # Readable registers
            self.request_register_reading(rid)
            time.sleep(0.0005)
    results: Dict[int, float | int] = {}
    time.sleep(0.01)  # wait for the replies
    for rid, reg_info in REGISTER_TABLE.items():
        if reg_info.access in ["RO", "RW"]:  # Readable registers
            try:
                results[rid] = self.get_register(rid, timeout=timeout)
            except (TimeoutError, KeyError, ValueError, RuntimeError) as e:
                # Store error as string for debugging
                results[rid] = f"ERROR: {e}"
    return results

request_motor_feedback

request_motor_feedback() -> None

Request motor feedback/status information. After successful transmission, the motor driver will return current status information.

Source code in damiao_motor/motor.py
def request_motor_feedback(self) -> None:
    """
    Request motor feedback/status information.
    After successful transmission, the motor driver will return current status information.
    """
    canid_l, canid_h = self._encode_can_id(self.motor_id)
    msg_data = bytes([canid_l, canid_h, 0xCC, 0x00, 0x00, 0x00, 0x00, 0x00])
    self.send_raw(msg_data, arbitration_id=0x7FF)

request_register_reading

request_register_reading(rid: int) -> None

Request a register reading from the motor.

Source code in damiao_motor/motor.py
def request_register_reading(self, rid: int) -> None:
    """
    Request a register reading from the motor.
    """
    with self.register_request_time_lock:
        self.register_request_time[rid] = time.perf_counter()
    self._send_register_cmd(0x33, rid)

send_cmd

send_cmd(target_position: float = 0.0, target_velocity: float = 0.0, stiffness: float = 0.0, damping: float = 0.0, feedforward_torque: float = 0.0, control_mode: str = 'MIT', velocity_limit: float = 0.0, current_limit: float = 0.0) -> None

Send command to motor with specified control mode.

Parameters:

Name Type Description Default
target_position float

Desired position (radians)

0.0
target_velocity float

Desired velocity (rad/s)

0.0
stiffness float

Stiffness (kp) for MIT mode

0.0
damping float

Damping (kd) for MIT mode

0.0
feedforward_torque float

Feedforward torque for MIT mode

0.0
control_mode str

Control mode - "MIT" (default), "POS_VEL", "VEL", or "FORCE_POS"

'MIT'
velocity_limit float

Velocity limit (rad/s, 0-100) for FORCE_POS mode

0.0
current_limit float

Current limit normalized (0.0-1.0) for FORCE_POS mode

0.0
Note

Before using this method to send commands, ensure that the motor's control mode register (register 10) is set to match the desired control_mode argument ("MIT", "POS_VEL", "VEL", or "FORCE_POS"). If the register does not match, the motor will not respond to commands and will not move.

Source code in damiao_motor/motor.py
def send_cmd(
    self,
    target_position: float = 0.0,
    target_velocity: float = 0.0,
    stiffness: float = 0.0,
    damping: float = 0.0,
    feedforward_torque: float = 0.0,
    control_mode: str = "MIT",
    velocity_limit: float = 0.0,
    current_limit: float = 0.0,
) -> None:
    """
    Send command to motor with specified control mode.

    Args:
        target_position: Desired position (radians)
        target_velocity: Desired velocity (rad/s)
        stiffness: Stiffness (kp) for MIT mode
        damping: Damping (kd) for MIT mode
        feedforward_torque: Feedforward torque for MIT mode
        control_mode: Control mode - "MIT" (default), "POS_VEL", "VEL", or "FORCE_POS"
        velocity_limit: Velocity limit (rad/s, 0-100) for FORCE_POS mode
        current_limit: Current limit normalized (0.0-1.0) for FORCE_POS mode

    Note:
        Before using this method to send commands, ensure that the motor's control mode register (register 10)
        is set to match the desired control_mode argument ("MIT", "POS_VEL", "VEL", or "FORCE_POS").
        If the register does not match, the motor will not respond to commands and will not move.
    """
    # Check if motor is disabled and enable it if necessary
    if self.state and self.state.get("status_code") == DM_MOTOR_DISABLED:
        self.enable()

    # Check if motor has lost communication and clear error if necessary
    if self.state and self.state.get("status_code") == DM_MOTOR_LOST_COMM:
        self.clear_error()

    if control_mode == "MIT":
        # MIT-style control mode (default)
        data = self.encode_cmd_msg(target_position, target_velocity, feedforward_torque, stiffness, damping)
        self.send_raw(data)
    elif control_mode == "POS_VEL":
        # POS_VEL Mode: CAN ID 0x100 + motor_id
        data = struct.pack('<ff', target_position, target_velocity)
        arbitration_id = 0x100 + self.motor_id
        self.send_raw(data, arbitration_id=arbitration_id)
    elif control_mode == "VEL":
        # VEL Mode: CAN ID 0x200 + motor_id
        data = struct.pack('<f', target_velocity) + b'\x00' * 4
        arbitration_id = 0x200 + self.motor_id
        self.send_raw(data, arbitration_id=arbitration_id)
    elif control_mode == "FORCE_POS":
        # FORCE_POS Mode: CAN ID 0x300 + motor_id
        # Clamp and scale velocity limit (0-100 rad/s -> 0-10000)
        v_des_clamped = max(0.0, min(100.0, velocity_limit))
        v_des_scaled = int(v_des_clamped * 100)
        v_des_scaled = min(10000, v_des_scaled)

        # Clamp and scale current limit (0.0-1.0 -> 0-10000)
        i_des_clamped = max(0.0, min(1.0, current_limit))
        i_des_scaled = int(i_des_clamped * 10000)
        i_des_scaled = min(10000, i_des_scaled)

        # Pack: float (4 bytes) + uint16 (2 bytes) + uint16 (2 bytes)
        data = struct.pack('<fHH', target_position, v_des_scaled, i_des_scaled)
        arbitration_id = 0x300 + self.motor_id
        self.send_raw(data, arbitration_id=arbitration_id)
    else:
        raise ValueError(f"Unknown control_mode: {control_mode}. Must be 'MIT', 'POS_VEL', 'VEL', or 'FORCE_POS'")

send_raw

send_raw(data: bytes, arbitration_id: int | None = None) -> None

Send raw CAN message.

Parameters:

Name Type Description Default
data bytes

CAN message data bytes (must be 8 bytes)

required
arbitration_id int | None

CAN arbitration ID (defaults to motor_id if not specified)

None

Raises:

Type Description
ValueError

If data is not 8 bytes or arbitration_id is invalid

OSError

If CAN bus error occurs (e.g., Error Code 105 - No buffer space)

CanError

If CAN-specific error occurs

AttributeError

If bus is not initialized

Source code in damiao_motor/motor.py
def send_raw(self, data: bytes, arbitration_id: int | None = None) -> None:
    """
    Send raw CAN message.

    Args:
        data: CAN message data bytes (must be 8 bytes)
        arbitration_id: CAN arbitration ID (defaults to motor_id if not specified)

    Raises:
        ValueError: If data is not 8 bytes or arbitration_id is invalid
        OSError: If CAN bus error occurs (e.g., Error Code 105 - No buffer space)
        can.CanError: If CAN-specific error occurs
        AttributeError: If bus is not initialized
    """
    if len(data) != 8:
        raise ValueError(f"CAN message data must be 8 bytes, got {len(data)} bytes")

    if arbitration_id is None:
        arbitration_id = self.motor_id

    if arbitration_id < 0 or arbitration_id > 0x7FF:
        raise ValueError(f"Invalid arbitration_id: {arbitration_id}. Must be in range 0-0x7FF")

    try:
        msg = can.Message(arbitration_id=arbitration_id, data=data, is_extended_id=False)
        self.bus.send(msg)
    except OSError as e:
        error_str = str(e)
        errno = getattr(e, 'errno', None)

        # Error Code 105: No buffer space available
        if errno == 105 or "Error Code 105" in error_str or "No buffer space available" in error_str or "[Errno 105]" in error_str:
            raise OSError(
                f"CAN bus buffer full (Error Code 105) when sending to arbitration_id 0x{arbitration_id:03X}. "
                f"This typically indicates: no motor connected, motor not powered, or CAN hardware issue. "
                f"Original error: {e}"
            ) from e
        # Other OSError cases
        raise OSError(f"CAN bus system error when sending to arbitration_id 0x{arbitration_id:03X}: {e}") from e
    except can.CanError as e:
        raise can.CanError(f"CAN bus error when sending to arbitration_id 0x{arbitration_id:03X}: {e}") from e
    except AttributeError as e:
        if "bus" in str(e).lower() or "send" in str(e).lower():
            raise AttributeError(f"CAN bus not initialized. Bus may be closed or not connected.") from e
        raise
    except Exception as e:
        raise RuntimeError(f"Unexpected error sending CAN message to arbitration_id 0x{arbitration_id:03X}: {e}") from e

set_acceleration

set_acceleration(value: float) -> None

Set acceleration (register 4).

Source code in damiao_motor/motor.py
def set_acceleration(self, value: float) -> None:
    """Set acceleration (register 4)."""
    self.write_register(4, value)

set_can_baud_rate

set_can_baud_rate(baud_rate_code: int) -> None

Set CAN baud rate using register 35 (can_br).

Parameters:

Name Type Description Default
baud_rate_code int

Baud rate code (0=125K, 1=200K, 2=250K, 3=500K, 4=1M)

required

Raises:

Type Description
ValueError

If baud_rate_code is not in valid range [0, 4]

Source code in damiao_motor/motor.py
def set_can_baud_rate(self, baud_rate_code: int) -> None:
    """
    Set CAN baud rate using register 35 (can_br).

    Args:
        baud_rate_code: Baud rate code (0=125K, 1=200K, 2=250K, 3=500K, 4=1M)

    Raises:
        ValueError: If baud_rate_code is not in valid range [0, 4]
    """
    if baud_rate_code not in CAN_BAUD_RATE_CODES:
        raise ValueError(f"Invalid baud rate code: {baud_rate_code}. Must be in {list(CAN_BAUD_RATE_CODES.keys())}")

    self.write_register(35, baud_rate_code)  # Register 35 is can_br
    self.store_parameters()  # Store to flash so it persists

set_can_timeout

set_can_timeout(timeout_ms: int) -> None

Set CAN timeout alarm time (register 9).

Parameters:

Name Type Description Default
timeout_ms int

Timeout in milliseconds

required
Note

Register 9 stores timeout in units of 50 microseconds: 1 register unit = 50 microseconds.

Conversion formula: register_value = timeout_ms × 20

Examples: - 1000 ms = 1,000,000 microseconds = 20,000 register units - 50 ms = 50,000 microseconds = 1,000 register units

Source code in damiao_motor/motor.py
def set_can_timeout(self, timeout_ms: int) -> None:
    """
    Set CAN timeout alarm time (register 9).

    Args:
        timeout_ms: Timeout in milliseconds

    Note:
        Register 9 stores timeout in units of 50 microseconds: **1 register unit = 50 microseconds**.

        Conversion formula: register_value = timeout_ms × 20

        Examples:
        - 1000 ms = 1,000,000 microseconds = 20,000 register units
        - 50 ms = 50,000 microseconds = 1,000 register units
    """
    # Convert milliseconds to register units: 1 register unit = 50 microseconds
    # timeout_ms * 1000 us/ms / 50 us/unit = timeout_ms * 20
    register_value = timeout_ms * 20
    self.write_register(9, register_value)

set_control_mode

set_control_mode(value: int) -> None

Set control mode (register 10).

Source code in damiao_motor/motor.py
def set_control_mode(self, value: int) -> None:
    """Set control mode (register 10)."""
    self.write_register(10, value)

set_current_loop_bandwidth

set_current_loop_bandwidth(value: float) -> None

Set current loop control bandwidth (register 24).

Source code in damiao_motor/motor.py
def set_current_loop_bandwidth(self, value: float) -> None:
    """Set current loop control bandwidth (register 24)."""
    self.write_register(24, value)

set_current_loop_enhancement

set_current_loop_enhancement(value: float) -> None

Set current loop enhancement coefficient (register 33).

Source code in damiao_motor/motor.py
def set_current_loop_enhancement(self, value: float) -> None:
    """Set current loop enhancement coefficient (register 33)."""
    self.write_register(33, value)

set_deceleration

set_deceleration(value: float) -> None

Set deceleration (register 5).

Source code in damiao_motor/motor.py
def set_deceleration(self, value: float) -> None:
    """Set deceleration (register 5)."""
    self.write_register(5, value)

set_feedback_id

set_feedback_id(value: int) -> None

Set feedback ID (register 7).

Source code in damiao_motor/motor.py
def set_feedback_id(self, value: int) -> None:
    """Set feedback ID (register 7)."""
    self.write_register(7, value)

set_gear_efficiency

set_gear_efficiency(value: float) -> None

Set gear torque efficiency (register 30).

Source code in damiao_motor/motor.py
def set_gear_efficiency(self, value: float) -> None:
    """Set gear torque efficiency (register 30)."""
    self.write_register(30, value)

set_limits

set_limits(*, p_min: Optional[float] = None, p_max: Optional[float] = None, v_min: Optional[float] = None, v_max: Optional[float] = None, t_min: Optional[float] = None, t_max: Optional[float] = None) -> None

Update only the specified P/V/T limits. Omitted keys are left unchanged. kp and kd are fixed (KP_MIN/KP_MAX, KD_MIN/KD_MAX).

Source code in damiao_motor/motor.py
def set_limits(
    self,
    *,
    p_min: Optional[float] = None,
    p_max: Optional[float] = None,
    v_min: Optional[float] = None,
    v_max: Optional[float] = None,
    t_min: Optional[float] = None,
    t_max: Optional[float] = None,
) -> None:
    """Update only the specified P/V/T limits. Omitted keys are left unchanged. kp and kd are fixed (KP_MIN/KP_MAX, KD_MIN/KD_MAX)."""
    if p_min is not None:
        self._p_min = p_min
    if p_max is not None:
        self._p_max = p_max
    if v_min is not None:
        self._v_min = v_min
    if v_max is not None:
        self._v_max = v_max
    if t_min is not None:
        self._t_min = t_min
    if t_max is not None:
        self._t_max = t_max

set_maximum_speed

set_maximum_speed(value: float) -> None

Set maximum speed (register 6).

Source code in damiao_motor/motor.py
def set_maximum_speed(self, value: float) -> None:
    """Set maximum speed (register 6)."""
    self.write_register(6, value)

set_motor_type

set_motor_type(motor_type: str) -> None

Update motor type and P/V/T limits from a preset. Validates against MOTOR_TYPE_PRESETS.

Source code in damiao_motor/motor.py
def set_motor_type(self, motor_type: str) -> None:
    """Update motor type and P/V/T limits from a preset. Validates against MOTOR_TYPE_PRESETS."""
    if motor_type not in MOTOR_TYPE_PRESETS:
        raise ValueError(
            f"Unknown motor_type: {motor_type!r}. "
            f"Known: {list(MOTOR_TYPE_PRESETS.keys())}"
        )
    base = dict(MOTOR_TYPE_PRESETS[motor_type])
    for k in _LIMITS_KEYS:
        setattr(self, f"_{k}", base[k])
    self.motor_type = motor_type

set_over_current_protection

set_over_current_protection(value: float) -> None

Set over-current protection value (register 3).

Source code in damiao_motor/motor.py
def set_over_current_protection(self, value: float) -> None:
    """Set over-current protection value (register 3)."""
    self.write_register(3, value)

set_over_temperature_protection

set_over_temperature_protection(value: float) -> None

Set over-temperature protection value (register 2).

Source code in damiao_motor/motor.py
def set_over_temperature_protection(self, value: float) -> None:
    """Set over-temperature protection value (register 2)."""
    self.write_register(2, value)

set_overvoltage_protection

set_overvoltage_protection(value: float) -> None

Set overvoltage protection value (register 29).

Source code in damiao_motor/motor.py
def set_overvoltage_protection(self, value: float) -> None:
    """Set overvoltage protection value (register 29)."""
    self.write_register(29, value)

set_p_limits

set_p_limits(p_min: float, p_max: float) -> None

Set position limits used for encode/decode (MIT mode).

Source code in damiao_motor/motor.py
def set_p_limits(self, p_min: float, p_max: float) -> None:
    """Set position limits used for encode/decode (MIT mode)."""
    self._p_min, self._p_max = p_min, p_max

set_position_loop_ki

set_position_loop_ki(value: float) -> None

Set position loop integral gain Ki (register 28).

Source code in damiao_motor/motor.py
def set_position_loop_ki(self, value: float) -> None:
    """Set position loop integral gain Ki (register 28)."""
    self.write_register(28, value)

set_position_loop_kp

set_position_loop_kp(value: float) -> None

Set position loop proportional gain Kp (register 27).

Source code in damiao_motor/motor.py
def set_position_loop_kp(self, value: float) -> None:
    """Set position loop proportional gain Kp (register 27)."""
    self.write_register(27, value)

set_position_mapping_range

set_position_mapping_range(value: float) -> None

Set position mapping range (register 21).

Source code in damiao_motor/motor.py
def set_position_mapping_range(self, value: float) -> None:
    """Set position mapping range (register 21)."""
    self.write_register(21, value)

set_receive_id

set_receive_id(value: int) -> None

Set receive ID (register 8).

Source code in damiao_motor/motor.py
def set_receive_id(self, value: int) -> None:
    """Set receive ID (register 8)."""
    self.write_register(8, value)

set_speed_loop_damping

set_speed_loop_damping(value: float) -> None

Set speed loop damping coefficient (register 31).

Source code in damiao_motor/motor.py
def set_speed_loop_damping(self, value: float) -> None:
    """Set speed loop damping coefficient (register 31)."""
    self.write_register(31, value)

set_speed_loop_enhancement

set_speed_loop_enhancement(value: float) -> None

Set speed loop enhancement coefficient (register 34).

Source code in damiao_motor/motor.py
def set_speed_loop_enhancement(self, value: float) -> None:
    """Set speed loop enhancement coefficient (register 34)."""
    self.write_register(34, value)

set_speed_loop_filter_bandwidth

set_speed_loop_filter_bandwidth(value: float) -> None

Set speed loop filter bandwidth (register 32).

Source code in damiao_motor/motor.py
def set_speed_loop_filter_bandwidth(self, value: float) -> None:
    """Set speed loop filter bandwidth (register 32)."""
    self.write_register(32, value)

set_speed_loop_ki

set_speed_loop_ki(value: float) -> None

Set speed loop integral gain Ki (register 26).

Source code in damiao_motor/motor.py
def set_speed_loop_ki(self, value: float) -> None:
    """Set speed loop integral gain Ki (register 26)."""
    self.write_register(26, value)

set_speed_loop_kp

set_speed_loop_kp(value: float) -> None

Set speed loop proportional gain Kp (register 25).

Source code in damiao_motor/motor.py
def set_speed_loop_kp(self, value: float) -> None:
    """Set speed loop proportional gain Kp (register 25)."""
    self.write_register(25, value)

set_speed_mapping_range

set_speed_mapping_range(value: float) -> None

Set speed mapping range (register 22).

Source code in damiao_motor/motor.py
def set_speed_mapping_range(self, value: float) -> None:
    """Set speed mapping range (register 22)."""
    self.write_register(22, value)

set_t_limits

set_t_limits(t_min: float, t_max: float) -> None

Set torque limits used for encode/decode (MIT mode).

Source code in damiao_motor/motor.py
def set_t_limits(self, t_min: float, t_max: float) -> None:
    """Set torque limits used for encode/decode (MIT mode)."""
    self._t_min, self._t_max = t_min, t_max

set_timeout_alarm

set_timeout_alarm(value: int) -> None

Set timeout alarm time (register 9).

Parameters:

Name Type Description Default
value int

Timeout value in register units (1 unit = 50 microseconds)

required
Note

This method writes the raw register value. For convenience, use set_can_timeout() which accepts milliseconds and handles the conversion automatically.

Conversion: 1 register unit = 50 microseconds To convert from milliseconds: register_value = timeout_ms × 20

Source code in damiao_motor/motor.py
def set_timeout_alarm(self, value: int) -> None:
    """
    Set timeout alarm time (register 9).

    Args:
        value: Timeout value in register units (1 unit = 50 microseconds)

    Note:
        This method writes the raw register value. For convenience, use `set_can_timeout()`
        which accepts milliseconds and handles the conversion automatically.

        Conversion: 1 register unit = 50 microseconds
        To convert from milliseconds: register_value = timeout_ms × 20
    """
    self.write_register(9, value)

set_torque_coefficient

set_torque_coefficient(value: float) -> None

Set torque coefficient (register 1).

Source code in damiao_motor/motor.py
def set_torque_coefficient(self, value: float) -> None:
    """Set torque coefficient (register 1)."""
    self.write_register(1, value)

set_torque_mapping_range

set_torque_mapping_range(value: float) -> None

Set torque mapping range (register 23).

Source code in damiao_motor/motor.py
def set_torque_mapping_range(self, value: float) -> None:
    """Set torque mapping range (register 23)."""
    self.write_register(23, value)

set_under_voltage_protection

set_under_voltage_protection(value: float) -> None

Set under-voltage protection value (register 0).

Source code in damiao_motor/motor.py
def set_under_voltage_protection(self, value: float) -> None:
    """Set under-voltage protection value (register 0)."""
    self.write_register(0, value)

set_v_limits

set_v_limits(v_min: float, v_max: float) -> None

Set velocity limits used for encode/decode (MIT mode).

Source code in damiao_motor/motor.py
def set_v_limits(self, v_min: float, v_max: float) -> None:
    """Set velocity limits used for encode/decode (MIT mode)."""
    self._v_min, self._v_max = v_min, v_max

set_zero_command

set_zero_command() -> None

Send zero command (pos=0, vel=0, torq=0, kp=0, kd=0).

Source code in damiao_motor/motor.py
def set_zero_command(self) -> None:
    """Send zero command (pos=0, vel=0, torq=0, kp=0, kd=0)."""
    self.send_cmd(target_position=0.0, target_velocity=0.0, stiffness=0.0, damping=0.0, feedforward_torque=0.0)

set_zero_position

set_zero_position() -> None

Set the current output shaft position to zero.

Source code in damiao_motor/motor.py
def set_zero_position(self) -> None:
    """Set the current output shaft position to zero."""
    self.send_raw(self.encode_save_position_zero_msg())

store_parameters

store_parameters() -> None

Store all parameters to flash memory. After successful write, all parameters will be written to the chip.

Source code in damiao_motor/motor.py
def store_parameters(self) -> None:
    """
    Store all parameters to flash memory.
    After successful write, all parameters will be written to the chip.
    """
    self._send_register_cmd(0xAA, 0x01)

write_register

write_register(rid: int, value: float | int) -> None

Write a value to a register.

Parameters:

Name Type Description Default
rid int

Register ID (0-81)

required
value float | int

Value to write (float or int)

required

Raises:

Type Description
KeyError

If register ID is not in the register table

ValueError

If register is read-only or value is out of range

Source code in damiao_motor/motor.py
def write_register(self, rid: int, value: float | int) -> None:
    """
    Write a value to a register.

    Args:
        rid: Register ID (0-81)
        value: Value to write (float or int)

    Raises:
        KeyError: If register ID is not in the register table
        ValueError: If register is read-only or value is out of range
    """
    # Check if register exists in table
    if rid not in REGISTER_TABLE:
        raise KeyError(f"Register {rid} not found in register table")

    reg_info = REGISTER_TABLE[rid]

    # Check if register is writable
    if reg_info.access != "RW":
        raise ValueError(f"Register {rid} ({reg_info.variable}) is read-only (access: {reg_info.access})")

    # Encode value to 4 bytes using data type from register table
    if reg_info.data_type == "float":
        data_bytes = struct.pack("<f", float(value))
    elif reg_info.data_type == "uint32":
        data_bytes = struct.pack("<I", int(value))
    else:
        raise ValueError(f"Unknown data_type: {reg_info.data_type} for register {rid}")

    # Send write command
    self._send_register_cmd(0x55, rid, data_bytes)