Jc-alt logo
jonathancamberos
LeetCode: Heaps
32 min read
#data structures and algorithms
Table Of Content

Heaps intro

LeetCode problems with heap solutions.

What is a Heap

Heaps are specialized complete binary tree structures used to prioritize data.

A heap maintains a partial order property:

  1. MinHeap: Parent nodes are less than or equal to their children (smallest element at root)
  2. MaxHeap: Parent nodes are greater than or equal to their children (largest element at root)

The tree is complete: all levels are fully filled except possible the last, which is filled left to right

Peek(): efficient access to min/max element in O(1) time Insert()/Pop(): Heap edit in O(log n) time

Heap Characteristics

Heaps are characterized by:

  1. Nodes: Contain values arranged by heap property
  2. Complete Tree: Shape property ensures tree is always balanced
  3. Heap Property: Parent value ≤ (minHeap) or Parent value ≥ children values
  4. Nothing beyond heap property: No ordering between siblings or subtrees maintained, other than parent child from heap property
  5. Array: Implemented as array for space and cache efficiency

Heap Representation

Tree heap represented as an array:

    Array Form
    Index:  0   1   2   3   4   5   6
    Value: [3,  5,  8,  9, 10, 12, 15]

    Tree Form

            (0)3
            /     \
        (1)5       (2)8
        /   \      /   \
    (3)9  (4)10 (5)12 (6)15

Heap IRL

Priority Queues: scheduling, bandwidth management Graph algorithms: Dijkstra's shortest path, Prim's MST Event simulation: process events by priority/time

Heap Application: Top K Element Extraction

We can maintain a heap to quickly retrieve the top K largest or smallest elements without fully sorting the input.

Ex: Find the kth largest element in array

    def kthLargest(nums, k):
        heap = []
        for num in nums:
            heapq.heappush(heap, num)
            if len(heap) > k:
                heapq.heappop(heap)  # Keep only top K elements
        return heap[0]

    # Example: kthLargest([3, 2, 1, 5, 6, 4], 2) -> 5

Heap Application: Dual Heap Balancing Representing Secondary Property

We can use two heaps, maxHeap for lower half of values and a minHeap for the upper half, to maintain a property while adding and removing elements. Pattern supports quick lookups of medians, balance points, or range constraints.

Ex: Maintain median of stream.

class MedianFinder:
    def __init__(self):
        self.low = []   # Max-heap (invert values)
        self.high = []  # Min-heap
    
    def addNum(self, num: int) -> None:
        heapq.heappush(self.low, -num)
        heapq.heappush(self.high, -heapq.heappop(self.low))
        if len(self.low) < len(self.high):
            heapq.heappush(self.low, -heapq.heappop(self.high))
    
    def findMedian(self) -> float:
        if len(self.low) > len(self.high):
            return -self.low[0]
        return (-self.low[0] + self.high[0]) / 2        

Heap Application: K Way Merge for Sorted Streams

A minHeap can efficiently merge multiple sorted lists or streams by always extracting the next smallest element across all inputs.

Ex: Merge k sorted lists into one sorted lists

    def mergeKSorted(lists):
        heap = []
        for i, lst in enumerate(lists):
            if lst:
                heapq.heappush(heap, (lst[0], i, 0))
        
        result = []
        while heap:
            val, list_idx, elem_idx = heapq.heappop(heap)
            result.append(val)
            if elem_idx + 1 < len(lists[list_idx]):
                heapq.heappush(heap, (lists[list_idx][elem_idx+1], list_idx, elem_idx+1))
        return result

    # mergeKSorted([[1,4,5],[1,3,4],[2,6]]) -> [1,1,2,3,4,4,5,6]   

Heap Application: Best First Search or Breadth First Search with Priority

A heap can drive a search algorithm where you can expand the 'best' candidate first: A*, Dijkstra's, Prims, etc. In a non negative weighted graph context: a minHeap can be used to expand on the closest node first in a This always ensures that the next node popped from the heap has the smallest known distance from the source.

Ex: Dijkstra's Algorithm MinHeap

def dijkstra(graph, start):
    
    # Graph: Node -> list of (neighbor, weight) tuples
    # start: Starting node for shortest path search

    # Result: 
    # dist: shortest distance from start to all other nodes

    # Init all distances to infinity, 
    # except start node which has distance 0
    dist = {node: float('inf') for node in graph}
    dist[start] = 0

    # minHeap priority queue stores tuples of (distance, node)
    # iterate from start node and commence exploring neighbors
    heap = [(0, start)]

    # while there are nodes to process in heap
    while heap:
        
        # grab node with smallest known distance from start
        d, node = heapq.heappop(heap)

        # Skip if we already have already explored a shorter
        # path to the current node
        if d > dist[node]:
            continue

        # Check each neighbor and see if we have found 
        # a shorter path through the current node
        for nei, w in graph[node]:
            
            # grab new distance
            nd = d + w

            # if shorter path found, update and push to heap
            if nd < dist[nei]:
                dist[nei] = nd
                heapq.heappush(heap, (nd, nei))

    # overall: time complexity
    # overall: space complexity
    return dist

Heap Application: Kth element Within Iterating Sliding Window

A heap can track max/min within a sliding window efficiently when combined with lazy deletion or index tracking.

Ex: Find maximum in each sliding windows

    def max_sliding_window(nums, k):

        # maxHeap via minHeap with negative values
        # initialize with first k elements
        heap = [(-nums[i], i) for i in range(k)]
        heapq.heapify(heap)

        # root of heap is the largest in the current window
        res = [-heap[0][0]]
        
        # iterate sliding window forward
        for i in range(k, len(nums)):

            # push new element to heap
            heapq.heappush(heap, (-nums[i], i))

            # remove top element if they are outside current window
            while heap[0][1] <= i - k:
                heapq.heappop(heap)

            # after removing out of window elements, heap root is max
            # within the current window
            res.append(-heap[0][0])

        return res

Heap Application: Interval Scheduling Optimization

A minHeap can track the earliest finishing times among active intervals, enabling optimal scheduling of jobs or meetings without conflicts.

Ex: Find the minimum number of meeting rooms required

    def minMeetingRooms(intervals):

        # Sort intervals by their start time,
        # ensures we process meetings in chronological order
        intervals.sort(key=lambda x: x[0])

        # minHeap to track end time of ongoing meetings,
        # storing the earliest ending meeting at the top
        heap = []

        # iterate over rooms in earliest starting order
        for start, end in intervals:

            # if heap is not empty, and the earliest ending meeting the earliest meeting ends
            # ends before or exactly when the new meeting starts,
            # then we can remove the earliest ending meeting 
            # (essentially freeing up a room)
            if heap and heap[0] <= start:
                # pop meeting that ended the earliest
                heapq.heappop(heap)

            # 'start' the current meeting by pushing it to the heap
            heapq.heappush(heap, end)

        # last heap size is the number meetings running
        # concurrently after all meetings have started
        return len(heap)

    # Example: 
    # minMeetingRooms([[0,30],[5,10],[15,20]]) -> 2

703. Kth Largest Element in a Stream ::1:: - Easy

Topics: Tree, Design, Binary Search Tree, Heap (Priority Queue), Binary Tree, Data Stream

Intro

You are part of a university admissions office and need to keep
track of the kth highest test score from applicants in real-time. This helps to determine cut-off marks for interviews and admissions dynamically as new applicants submit their scores. You are tasked to implement a class which, for a given integer k, maintains a stream of test scores and continuously returns the kth highest test score after a new score has been submitted. More specifically, we are looking for the kth highest score in the sorted list of all scores. Implement the KthLargest class: KthLargest(int k, int[] nums) Initializes the object with the integer k and the stream of test scores nums. int add(int val) Adds a new test score val to the stream and returns the element representing the kth largest element in the pool of test scores so far.

Example InputOutput
["KthLargest", "add", "add", "add", "add", "add"] [[3, [4, 5, 8, 2]], [3], [5], [10], [9], [4]][null, 4, 5, 5, 8, 8]
["KthLargest", "add", "add", "add", "add"] [[4, [7, 7, 7, 7, 8, 3]], [2], [10], [9], [9]][null, 7, 7, 7, 8]

Constraints:

0 ≤ nums.length ≤ 104

1 ≤ k ≤ nums.length + 1

-104 ≤ nums[i] ≤ 104

-104 ≤ val ≤ 104

At most 104 calls will be make to add.

Abstraction

Implement a heap that will keep track of the kth highest score for a data stream.

Space & Time Complexity

SolutionTime ComplexitySpace ComplexityTime RemarkSpace Remark
BugError

Brute Force:

AspectTime ComplexitySpace ComplexityTime RemarksSpace Remarks

Find the Bug:

Solution 1: Min Heap of Size k - Heap/Heap

class KthLargest:

    def __init__(self, k: int, nums: list[int]):
        
        # Note:
        # 1.  MinHeap of size k containing the k largest elements seen so far.
        # Result: Top k scores saved, with lowest top score at root

        # size of minHeap     
        self.k = k

        # minHeap
        self.minHeap = []
        
        # stream input data
        for num in nums:
            self.add(num)
    
    # time complexity:
    # space complexity: 
    def add(self, val: int) -> int:

        # Note:
        # 1. if minHeap is less than capacity, simply push
        # 2. if minHeap is out of space, push only if new score beats lowest score (root)

        # heap still has space, simply push
        if len(self.minHeap) < self.k:
            heapq.heappush(self.minHeap, val)

        # heap is out of space, check if valid add
        else:
            # push if new score beats lowest high score (root)
            smallestTopScore = self.minHeap[0]
            if val > smallestTopScore:
                heapq.heapreplace(self.minHeap, val)
        
        # return top score
        return self.minHeap[0]

    # overall: time complexity
    # overall: space complexity 

1046. Last Stone Weight ::1:: - Easy

Topics: Array, Heap (Priority Queue)

Intro

You are given an array of integers stones where stones[i] is the weight of the ith stone. We are playing a game with the stones. On each turn, we choose the heaviest two stones and smash them together. Suppose the heaviest two stones have weights x and y with x ≤ y. The result of this smash is: If x == y, both stones are destroyed, and If x != y, the stone of weight x is destroyed, and the stone of weight y has new weight y - x. At the end of the game, there is at most one stone left. Return the weight of the last remaining stone. If there are no stones left, return 0.

Example InputOutput
stones = [2,7,4,1,8,1]1
stones = [1]1

Constraints:

1 ≤ stones.length ≤ 30

1 ≤ stones[i] ≤ 1000

Abstraction

Stone smashing game. Given some amount of stones, a round consists of smashing the two heaviest stones together. If they are the same weight both stones are destroyed. If one is heavier than the other, the heavier stone loses weight equal to the weight of the less heavy stone.

Space & Time Complexity

SolutionTime ComplexitySpace ComplexityTime RemarkSpace Remark
BugError

Brute Force:

AspectTime ComplexitySpace ComplexityTime RemarksSpace Remarks

Find the Bug:

Solution 1: Max Heap - Heap/Heap

    def lastStoneWeight(self, stones: list[int]) -> int:
        # Note:
        # 1. MaxHeap via MinHeap with negative values
        # 2. Continue smashing pairs of stones until heap size 1
        # Result: final stone weight or all stones destroyed
       
        # turn list to negative numbers
        maxHeap = [-stone for stone in stones]

        # transform list into heap, in place, in linear time
        heapq.heapify(maxHeap)

        # remove top two stones until one stone left 
        while len(maxHeap) > 1:

            # grab two heaviest stones
            first = -(heapq.heappop(maxHeap))
            second = -(heapq.heappop(maxHeap))

            # push if partial stone remains
            if first != second:
                remainingStoneWeight = -(first - second)
                heapq.heappush(maxHeap, remainingStoneWeight)

        # check if stone remains
        res = -maxHeap[0] if maxHeap else 0

        # overall: time complexity
        # overall: space complexity
        return res

973. K Closest Points to Origin ::2:: - Medium

Topics: Array, Math, Divide and Conquer, Geometry, Sorting, Heap (Priority Queue), Quickselect

Intro

Given an array of points where points[i] = [x_i, y_i] represents a point on the X-Y plane and an integer k, return the k closest points to the origin (0, 0). The distance between two points on the X-Y plane is the Euclidean distance: sqrt((x_1, x-2)^2 + (y_1 + y_2)^2) You may return the answer in any order. The answer is guaranteed to be unique (except for the order that it is in).

Example InputOutput
points = [[1,3],[-2,2]], k = 1[[-2, 2]]
points = [[3,3],[5,-1],[-2,4]], k = 2[[3,3],[-2,4]]

Constraints:

1 ≤ k ≤ points.length ≤ 104

-104 ≤ x_1, y_i ≤ 104

Abstraction

Given a list of point on a X-Y grid, find the k closest points to the origin.

Space & Time Complexity

SolutionTime ComplexitySpace ComplexityTime RemarkSpace Remark
BugError

Brute Force:

AspectTime ComplexitySpace ComplexityTime RemarksSpace Remarks

Find the Bug:

Solution 1: Max Heap of size k - Heap/Heap

    def kClosest(self, points: list[list[int]], k: int) -> list[list[int]]:
        # Note:
        # 1. We want the k points with the smallest Euclidean distances from (0, 0).
        # 2. Instead of computing sqrt(x^2 + y^2), we can use the squared distance x^2 + y^2
        #    because sqrt is monotonic and doesn't change order.
        # 3. Approach:
        #    - Use a max-heap (store negative distances) of size k.
        #    - Iterate over points:
        #         if heap size < k: push (-distance, point)
        #         else if current distance < largest distance in heap: pop and push
        #    - This ensures we always keep k closest points in O(n log k) time.

        # (-distance, [x, y])
        maxHeap = []

        # add all distances to maxHeap
        for x, y in points:

            # square distance
            dist = x * x + y * y 

            # if maxHeap has space, simply push
            if len(maxHeap) < k:
                heapq.heappush(maxHeap, (-dist, [x, y]))

            # if maxHeap is full, check if push is required
            else:

                # if closer than farthest point
                if dist < -maxHeap[0][0]:
                    heapq.heapreplace(maxHeap, (-dist, [x, y]))
        
        # grab list of coords
        res = [xy for (_, xy) in maxHeap]

        # overall: time complexity
        # overall: space complexity
        return res

Solution 2: QuickSelect - Heap/Heap

    def kClosest(self, points: list[list[int]], k: int) -> list[list[int]]:
        
        # Note:
        # Classic Quick Select but with square distance
        # sqrt() is monotonic so we can ignore it
        
        # squared distance to origin
        def dist(i: int) -> int:
            return points[i][0] ** 2 + points[i][1] ** 2
        
        def partition(left, right, randPivotIndex) -> int:

            # prepare partition
            randPivotDist = dist(randPivotIndex)
            points[randPivotIndex], points[right] = points[right], points[randPivotIndex]
            leftPartition = left
            
            # partition all closer points to left
            for i in range(left, right):

                # if point is closer to origin (lower)
                if dist(i) < randPivotDist:
                    points[leftPartition], points[i] = points[i], points[leftPartition]
                    leftPartition += 1
            
            # restore pivot
            points[right], points[leftPartition] = points[leftPartition], points[right]

            # return
            return leftPartition
        
        def quickselect(left, right, finalIndex):
            
            # Base case:
            if left >= right:
                return
            
            # random pivot and result
            randPivotIndex = random.randint(left, right)
            resPivot = partition(left, right, randPivotIndex)
            
            # target check
            if resPivot == finalIndex:
                return
            
            elif resPivot < finalIndex: 
                quickselect(resPivot + 1, right, finalIndex)

            else:
                quickselect(left, resPivot - 1, finalIndex)
            
        
        # boundary
        n = len(points)

        # quickSelect on bounds
        quickselect(0, n - 1, k)

        # grab lowest k points
        res = points[:k]

        # overall: time complexity
        # overall: space complexity
        return res

215. Kth Largest Element in an Array ::2:: - Medium

Topics: Array, Math, Divide and Conquer, Geometry, Sorting, Heap (Priority Queue), Quickselect

Intro

Given an integer array nums and an integer k, return the kth largest element in the array. Note that it is the kth largest element in the sorted order, not the kth distinct element. Can you solve it without sorting?

Example InputOutput
nums = [3,2,1,5,6,4], k = 25
nums = [3,2,3,1,2,4,5,5,6], k = 44

Constraints:

1 ≤ k ≤ nums.length ≤ 105

-104 ≤ nums[i] ≤ 104

Abstraction

Given an unsorted array, find the kth largest element in sorted order.

Space & Time Complexity

SolutionTime ComplexitySpace ComplexityTime RemarkSpace Remark
BugError

Brute Force:

AspectTime ComplexitySpace ComplexityTime RemarksSpace Remarks

Find the Bug:

Solution 1: Min Heap of Size k - Heap/Heap

    def findKthLargest(self, nums: list[int], k: int) -> int:
        # Note:
        # 1. MinHeap of size k
        # Result: root of minHeap contains kth largest element
        
        # heap
        minHeap = []
        
        # for each element
        for num in nums:

            # if heap has space, simply push
            if len(minHeap) < k:
                heapq.heappush(minHeap, num)

            # if heap has no space, check if push is necessary
            else:
                # push if new num is larger than smallest in minHeap,
                # this will force a new kth largest element
                if num > minHeap[0]:
                    heapq.heapreplace(minHeap, num)
        
        # kth largest is at the root of the minHeap
        kth = minHeap[0]

        # overall: time complexity
        # overall: space complexity
        return kth

Solution 2: Modified Ignore Duplicates Quick Select - Heap/Heap

    def findKthLargest(self, nums: List[int], k: int) -> int:
        
        # Note:
        # Optimized Quick Select which will remove 1 subList of duplicates per iteration 
        # 1. RandPivot
        # 2. Partition nums by creating larger, equal, or smaller than pivot subLists
        # 3. Recurse into the correct subList
        # Result: kth largest found while accounting for potential stream of duplicates

        # Empty check
        if not nums: 
            return
        
        # random pivot
        pivot = random.choice(nums)
        
        # Partition nums into three parts:
        # bigger: elements greater than pivot
        bigger = [num for num in nums if num > pivot]
        
        # if kth is within bigger, recurse
        if k <= len(bigger):
            return self.findKthLargest(bigger, k)
        
        # Duplicate Optimization 
        # equal: elements equal to pivot
        equal = [num for num in nums if num == pivot]
        
        # If kth largest is within biggest + equal count,
        # kth will be within partition duplicate list
        if k <= len(bigger) + len(equal):
            return equal[0]
        
        # kth must be within smaller list, recurse
        # update k to ignore length of bigger and equal list
        smaller = [num for num in nums if num < pivot]
        return self.findKthLargest(smaller, k-len(bigger)-len(equal))

621. Task Scheduler ::3:: - Medium

Topics: Array, Hash Table, Greedy, Sorting, Heap (Priority Queue), Counting, Computer Architecture Scheduling

Intro

You are given an array of CPU tasks, each labeled with a letter from A to Z, and a number n. Each CPU interval can be idle or allow the completion of one task. Tasks can be completed in any order, but there's a constraint: there has to be a gap of at least n intervals between two tasks with the same label. Return the minimum number of CPU intervals required to complete all tasks.

Example InputOutput
tasks = ["A","A","A","B","B","B"], n = 28
["A","C","A","B","D","B"], n = 16
tasks = ["A","A","A", "B","B","B"], n = 310

Constraints:

1 ≤ tasks.length ≤ 105

tasks[i] is an uppercase English letter

0 ≤ n ≤ 100

Abstraction

Given a list of tasks marked by uppercase English letters and a minimum distance n between identical tasks, find how many cycles it would take to finish all tasks.

Space & Time Complexity

SolutionTime ComplexitySpace ComplexityTime RemarkSpace Remark
BugError

Brute Force:

AspectTime ComplexitySpace ComplexityTime RemarksSpace Remarks

Find the Bug:

Solution 1: Highest Occurring Tasks into Greedy Math Formula - Heap/Heap

    def leastInterval(self, tasks: list[str], n: int) -> int:

        # Note:
        # 1. Highest occurring tasks + idle time determine the min scheduling
        # 2. Total number of tasks determine the min scheduling
        # Result: min scheduling for list of tasks 

        # Count frequency of each task (A-Z)
        freq = defaultdict(int)
        for task in tasks:
            freq[task] += 1
        
        # highest number of occurrences for single task
        maxFreq = max(freq.values())
        
        # number of tasks with highest number of occurrences
        maxCount = sum(1 for count in freq.values() if count == maxFreq)
        
        # The minimum time is based on arranging the most frequent tasks spaced by n:
        # ex:
        # A, B, C share freq count of 9
        # idle interval n = 5
        # 
        # so:
        # [A -> B -> C -> idle -> idle...] * 8 times
        # + [A -> B -> C] last group (notice no idles needed)

        # (maxFreq-1) -> 8 times = groups
        # (n+1) = length of group (n idle time + 1 for task occurrence

        # + maxCount for last group
        intervals = (maxFreq - 1) * (n + 1) + maxCount
        
        # if totalTasks is higher than intervals, totalTasks is the lower bound
        # ie: A,B,C combo takes 51 
        #     however, rest of alphabet all has 4 count, leading to 119
        # we will take the 119
        # (in this case, we wont have to use idle)
        res = max(intervals, len(tasks))

        # overall: time complexity
        # overall: space complexity
        return res

Solution 2: Max Heap Tick by Tick Simulation For Count Occurrences - Heap/Heap

    def leastInterval(self, tasks: list[str], n: int) -> int:

        # Note:
        # Simulates task scheduling process 
        # 1. MaxHeap picks current most frequent remaining task first
        # 2. Queue (FIFO) simulates cool down to store tasks that have been run
        # but must wait 'n' intervals before they can be scheduled again
        # 3. Each iteration represents 1 unit of CPU time
        # 4. Iterate until both MaxHeap and Queue are empty
        # Result: time is total CPU time needed, including idle intervals

        # counts
        freq = Counter(tasks)
        
        # MaxHeap with negative counts
        maxHeap = [-count for count in freq.values()]
        heapq.heapify(maxHeap)
        
        # total CPU time simulated
        time = 0
        
        # Queue to hold cooldown tasks: (ready_time, count)
        cooldown = deque()
        
        # until both are empty
        while maxHeap or cooldown:

            # 1 CPU tick
            time += 1
            
            # grab most frequent task
            if maxHeap:
                
                # remove most freq task from root, and decrement by 1 
                cnt = heapq.heappop(maxHeap) + 1
                if cnt != 0:
                    # put task into cool down,
                    # and calculate time until next available use
                    cooldown.append((time + n, cnt))
            
            # check if any task in cool down is ready to be added back to maxHeap
            if cooldown and cooldown[0][0] == time:

                # pop FIFO
                _, cnt = cooldown.popleft()

                # push task occurrence count back to maxHeap
                heapq.heappush(maxHeap, cnt)
        
        # overall: time complexity
        # overall: space complexity
        return time

Solution 3: Max Heap Time Jump Optimization Simulation for Count Occurrences - Heap/Heap

    def leastInterval(self, tasks: List[str], n: int) -> int:

        # Note:
        # Simulates task scheduling with a MaxHeap + cool down queue
        # 1. MaxHeap picks current most frequent remaining task first
        # 2. Queue (FIFO) simulates cool down to store tasks that have been run
        # but must wait 'n' intervals before they can be scheduled again
        # 3. if MaxHeap is empty but cool down queue is not, we 'jump' 
        #    directly to the next ready task instead of simulating idle ticks,
        #.   skipping over unnecessary +1 increments and speeds up simulation
        # Result: time is total CPU time needed, including idle intervals,
        #         with fewer iterations
        
        
        # counts
        freq = Counter(tasks)
        
        # MaxHeap with negative counts
        maxHeap = [-count for count in freq.values()]
        heapq.heapify(maxHeap)

        # total CPU time simulated
        time = 0

        # Queue to hold cooldown tasks: (-cnt, idleTime)
        q = deque()  

        # until both are empty
        while maxHeap or q:

            # CPU tick
            time += 1

            # if maxHeap is empty
            if not maxHeap:
                # time 'jump' to next cool down
                time = q[0][1]

            # maxHeap is non empty
            else:

                # remove most freq task from root, and decrement by 1 
                cnt = 1 + heapq.heappop(maxHeap)
                if cnt:
                    # put task into cool down,
                    # and calculate time until next available use 
                    q.append([cnt, time + n])

            # check if any task in cool down is ready to be added back to maxHeap
            if q and q[0][1] == time:
                
                # pop FIFO
                (cnt, idleTime) = q.popleft()

                heapq.heappush(maxHeap, cnt)

        # overall: time complexity
        # overall: space complexity
        return time

355. Design Twitter ::2:: - Medium

Topics: Hash Table, Linked List, Design, Heap (Priority Queue)

Intro

Design a simplified version of Twitter where users can post tweets, follow/unfollow another user, and is able to see the 10 most recent tweets in the user's news feed. Implement the Twitter class: Twitter() Initializes your twitter object. void postTweet(int userId, int tweetId) Composes a new tweet with ID tweetId by the user userId. Each call to this function will be made with a unique tweetId. List[Integer] getNewsFeed(int userId) Retrieves the 10 most recent tweet IDs in the user's news feed. Each item in the news feed must be posted by users who the user followed or by the user themself. Tweets must be ordered from most recent to least recent. void follow(int followerId, int followeeId) The user with ID followerId started following the user with ID followeeId. void unfollow(int followerId, int followeeId) The user with ID followerId started unfollowing the user with ID followeeId.

Example InputOutput
["Twitter", "postTweet", "getNewsFeed", "follow", "postTweet", "getNewsFeed", "unfollow", "getNewsFeed"] [[], [1, 5], [1], [1, 2], [2, 6], [1], [1, 2], [1]][null, null, [5], null, null, [6, 5], null, [5]]

Constraints:

1 ≤ userId, follwerId, followeeId ≤ 500

0 ≤ tweetId ≤ 104

All the tweets have unique IDs.

At most 3 * 104 calls will be made to postTweet, getNewsFeed, follow, and unfollow

A user cannot follow themselves.

Abstraction

Design a miniTwitter that allows for efficient news feed and postTweet. Boils down to desicions on efficiently storing tweets and retrieving the top 10 recent ones. Also boils down to a merge k sorted lists question.

Space & Time Complexity

SolutionTime ComplexitySpace ComplexityTime RemarkSpace Remark
BugError

Brute Force:

AspectTime ComplexitySpace ComplexityTime RemarksSpace Remarks

Find the Bug:

Solution 1: Store Tweets as Lists + Min Heap Merge - Heap/Heap

class Twitter:

    def __init__(self):

        # IRL Use Case: 
        # Balanced usage scenario users follow moderate number of accounts, tweet counts are not huge.

        # Note:
        # Tweets: Stores tweets as deques (lists) per user.
        # News Feed: Merges k sorted lists using a maxHeap to always pick the most
        #            recent tweet across the user and their followees.

        # Pros: Simple implementation, efficient for moderate tweet counts.
        # Cons: Heap merge has some overhead; memory grows with many tweets per user.

        self.followees = defaultdict(set)
        self.tweets = defaultdict(deque)
        self.time = 0
        self.FEED_SIZE = 10

    # time complexity:
    # space complexity:
    def postTweet(self, userId: int, tweetId: int) -> None:

        # Note:
        #

        self.time += 1
        self.tweets[userId].append((self.time, tweetId))

        if len(self.tweets[userId]) > 100:
            self.tweets[userId].popleft()

    # time complexity:
    # space complexity:
    def follow(self, followerId: int, followeeId: int) -> None:

        # Note:
        #

        if followerId != followeeId:
            self.followees[followerId].add(followeeId)

    # time complexity:
    # space complexity:
    def unfollow(self, followerId: int, followeeId: int) -> None:

        # Note:
        #

        self.followees[followerId].discard(followeeId)

    # time complexity:
    # space complexity:
    def getNewsFeed(self, userId: int) -> list[int]:

        # Note:
        #

        # heap stores (-timestamp, tweetId, userId, index_in_user_tweets)
        heap = []

        # The list of users to fetch tweets from: self + followees
        users = self.followees[userId] | {userId}
        
        # Init pointers for each user tweet list (start from newest)
        for u in users:

            if self.tweets[u]:

                # Index of last tweet in deque (newest)
                idx = len(self.tweets[u]) - 1
                timestamp, tweetId = self.tweets[u][idx]
                heapq.heappush(heap, (-timestamp, tweetId, u, idx))
        
        result = []

        while heap and len(result) < self.FEED_SIZE:

            neg_time, tweetId, u, idx = heapq.heappop(heap)
            result.append(tweetId)

            # Move pointer to next newest tweet if available
            if idx > 0:
                idx -= 1
                timestamp, tweetId = self.tweets[u][idx]
                heapq.heappush(heap, (-timestamp, tweetId, u, idx))
        
        return result

    # overall: time complexity
    # overall: space complexity

Solution 2: Store Tweets as Linked Lists + Merge K Sorted Lists - Heap/Heap

class TweetNode:
   
    # IRL Use Case: 
    # 'Celebrity' users with many tweets; avoids indexing overhead when fetching top 10.    
    

    # Note:
    # Tweets: Linked list node for tweets, storing tweetId, timestamp, and next pointer
    # News Feed: MaxHeap merges the heads of k linked lists, picking the newest tweet each time to produce top 10.

    # Pros: Handles users with very large tweet histories efficiently.
    # Cons: Pointer overhead; more complex than simple list implementation.

    def __init__(self, tweetId, time):
        self.tweetId = tweetId
        self.time = time
        self.next = None

class Twitter:
    def __init__(self):

        # Note:
        # Stores tweets as linked lists.
        # News feed built by max-heap merging list heads.
        # Pros/Cons as above.

        # Map userId to head of linked list of tweets
        self.tweets = {}

        # Map userId to set of followed userIds
        self.followees = defaultdict(set)
        self.time = 0
        self.FEED_SIZE = 10

    def postTweet(self, userId: int, tweetId: int) -> None:
        
        # Note:
        # 

        self.time += 1
        node = TweetNode(tweetId, self.time)
        node.next = self.tweets.get(userId, None)
        self.tweets[userId] = node

    def follow(self, followerId: int, followeeId: int) -> None:

        # Note:
        # 

        if followerId != followeeId:
            self.followees[followerId].add(followeeId)

    def unfollow(self, followerId: int, followeeId: int) -> None:

        # Note:
        #

        self.followees[followerId].discard(followeeId)

    def getNewsFeed(self, userId: int) -> list[int]:

        # Note:
        #

        # Users to consider: self + followees
        users = self.followees[userId] | {userId}
        
        # Build a max heap of (-time, TweetNode) for heads of all tweet lists
        heap = []

        for u in users:

            if self.tweets.get(u):
                heapq.heappush(heap, (-self.tweets[u].time, self.tweets[u]))
        
        result = []

        while heap and len(result) < self.FEED_SIZE:

            neg_time, node = heapq.heappop(heap)
            result.append(node.tweetId)

            if node.next:
                heapq.heappush(heap, (-node.next.time, node.next))
        
        return result

    # overall: time complexity
    # overall: space complexity

Solution 3: Store Tweets as Arrays/Deques + Dynamic Heap Merge - Heap/Heap

class Twitter:

    def __init__(self):

        # IRL Use Case: 
        # Most practical for IRL apps where MOST users have few tweets and follow a manageable number of accounts.
        # Slightly more efficient than Solution 1 due to index-based access for news feed construction.

        # Note:
        # Tweets: Stores tweets as deques per user, uses indices to access latest tweets.
        # News Feed: Max-heap merges the most recent tweets from each user for news feed.
        
        # Pros: Simple, practical, low overhead, works well with typical usage.
        # Cons: Slightly more complex than naive list append-only approach; still has heap overhead.

        # Map userId to list of (timestamp, tweetId)
        self.tweets = defaultdict(deque)
        
        # Map userId to set of followed userIds
        self.followees = defaultdict(set)
        
        # Global timestamp for ordering
        self.time = 0
        self.FEED_SIZE = 10

    def postTweet(self, userId: int, tweetId: int) -> None:
        # Increment timestamp
        self.time += 1
        self.tweets[userId].append((self.time, tweetId))
        # Optional: limit stored tweets to last 100 per user
        if len(self.tweets[userId]) > 100:
            self.tweets[userId].popleft()

    def follow(self, followerId: int, followeeId: int) -> None:
        if followerId != followeeId:
            self.followees[followerId].add(followeeId)

    def unfollow(self, followerId: int, followeeId: int) -> None:
        self.followees[followerId].discard(followeeId)

    def getNewsFeed(self, userId: int) -> list[int]:
        # Users to pull tweets from: self + followees
        users = self.followees[userId] | {userId}
        heap = []

        # Initialize heap with most recent tweet of each user
        for u in users:
            if self.tweets[u]:
                idx = len(self.tweets[u]) - 1
                ts, tid = self.tweets[u][idx]
                heapq.heappush(heap, (-ts, tid, u, idx))

        result = []

        while heap and len(result) < self.FEED_SIZE:
            neg_ts, tid, u, idx = heapq.heappop(heap)
            result.append(tid)
            if idx > 0:
                idx -= 1
                ts, tid = self.tweets[u][idx]
                heapq.heappush(heap, (-ts, tid, u, idx))

        return result

295. Find Median from Data Stream ::1:: - Hard

Topics: Two Pointers, Design, Sorting, Heap (Priority Queue), Data Stream

Intro

The median is the middle value in an ordered integer list. If the size of the list is even, there is no middle value, and the median is the mean of the two middle values. For example, for arr = [2,3,4], the median is 3. For example, for arr = [2,3], the median is (2 + 3) / 2 = 2.5. Implement the MedianFinder class: MedianFinder() initializes the MedianFinder object. void addNum(int num) adds the integer num from the data stream to the data structure. double findMedian() returns the median of all elements so far. Answers within 10-5 of the actual answer will be accepted.

Example InputOutput
["MedianFinder", "addNum", "addNum", "findMedian", "addNum", "findMedian"] [[], [1], [2], [], [3], []][null, null, null, 1.5, null, 2.0]

Constraints:

-105 ≤ num ≤ 105

There will be at least one element in the data structure before calling findMedian.

At most 5 * 104 calls will be made to addNum and findMedian

Abstraction

Find median value from a stream of data.

Space & Time Complexity

SolutionTime ComplexitySpace ComplexityTime RemarkSpace Remark
BugError

Brute Force:

AspectTime ComplexitySpace ComplexityTime RemarksSpace Remarks

Find the Bug:

Solution 1: MaxHeap + MinHeap Median Finder - Heap/Heap

class MedianFinder:

    # Note:
    # Two

    def __init__(self):
        
        # MaxHeap stores smaller half,
        # all numbers less than or equal to the median
        self.maxHeap = []

        # MinHeap stores larger half,
        # all numbers greater than or equal to the median
        self.minHeap = []

    # time complexity:
    # space complexity: 
    def addNum(self, num: int) -> None:

        # Note:
        # Two heaps track lower and upper half
        # 

        # Add new number to maxHeap
        heapq.heappush(self.maxHeap, -num)
        
        # Ensure all elements in maxHeap greater than every element in minHeap
        # move smallest  largest from maxHeap to minHeap if out of order
        if (self.minHeap and (-self.maxHeap[0] > self.minHeap[0])):
            val = -heapq.heappop(self.maxHeap)
            heapq.heappush(self.minHeap, val)
        
        # Balance heaps:
        # MaxHeap should have 1 more element than minHeap

        # if minHeap is larger, move minHeap root to maxHeap
        if len(self.maxHeap) < len(self.minHeap):
            
            #
            val = heapq.heappop(self.minHeap)
            #
            heapq.heappush(self.maxHeap, -val)

        # if maxHeap is larger by more than 1, move maxHeap root to minHeap
        elif len(self.maxHeap) > len(self.minHeap) + 1:
            
            #
            val = -heapq.heappop(self.maxHeap)
            #
            heapq.heappush(self.minHeap, val)


    # time complexity:
    # space complexity:
    def findMedian(self) -> float:

        # Note:
        # Median retrieval logic:
        # 1. if odd -> root of MaxHeap (largest in smaller half)
        # 2. if even -> average of roots from both heaps

        # odd
        if len(self.maxHeap) > len(self.minHeap):
            return -self.maxHeap[0]
       
        # even
        average = (-self.maxHeap[0] + self.minHeap[0]) / 2

        return average

Solution 2: Balanced BST - Heap/Heap

class MedianFinder:
    def __init__(self):

        # Note:
        # balanced binary tree 
        self.sl = SortedList()

    # time complexity:
    # space complexity:
    def addNum(self, num: int) -> None:

        # insert in sorted order
        self.sl.add(num)

    # time complexity:
    # space complexity:
    def findMedian(self) -> float:
        
        # len
        n = len(self.sl)

        # odd
        if n % 2 == 1:
            return self.sl[n // 2]

        # even
        average = (self.sl[n // 2 - 1] + self.sl[n // 2]) / 2

        return average

    
    # overall: time complexity
    # overall: space complexity